You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2022/02/01 04:10:53 UTC
[hive] branch master updated: HIVE-25819: Track event id on target cluster with respect to source cluster. (#2890). (Ayush Saxena, reviewed by Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 86f94bf HIVE-25819: Track event id on target cluster with respect to source cluster. (#2890). (Ayush Saxena, reviewed by Pravin Kumar Sinha)
86f94bf is described below
commit 86f94bf1df0db4b5e1c2a46d51d294c882ddb1f3
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Tue Feb 1 09:40:33 2022 +0530
HIVE-25819: Track event id on target cluster with respect to source cluster. (#2890). (Ayush Saxena, reviewed by Pravin Kumar Sinha)
---
.../hive/hcatalog/api/repl/ReplicationUtils.java | 2 +-
.../parse/TestReplicationOptimisedBootstrap.java | 284 +++++++++++++++++++++
.../hive/ql/parse/TestReplicationScenarios.java | 6 +-
.../parse/TestReplicationScenariosAcidTables.java | 7 -
.../hive/ql/exec/repl/OptimisedBootstrapUtils.java | 38 ++-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 8 +-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 6 +-
.../ql/exec/repl/bootstrap/load/LoadDatabase.java | 9 +-
.../incremental/IncrementalLoadTasksBuilder.java | 9 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 2 +-
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 5 +-
.../hadoop/hive/ql/parse/ReplicationSpec.java | 29 ++-
.../hive/ql/parse/repl/dump/io/DBSerializer.java | 2 +-
.../ql/parse/repl/dump/io/FunctionSerializer.java | 2 +-
.../ql/parse/repl/dump/io/PartitionSerializer.java | 2 +-
.../ql/parse/repl/dump/io/TableSerializer.java | 2 +-
.../repl/load/message/AlterDatabaseHandler.java | 3 +-
.../apache/hadoop/hive/common/repl/ReplConst.java | 5 +
.../apache/hadoop/hive/metastore/HMSHandler.java | 28 ++
19 files changed, 393 insertions(+), 56 deletions(-)
diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java
index 69e3c13..7514b1a 100644
--- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java
+++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java
@@ -41,7 +41,7 @@ import java.util.Map;
public class ReplicationUtils {
- public final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID.toString();
+ public final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString();
private ReplicationUtils(){
// dummy private constructor, since this class is a collection of static utility methods.
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index d5b819d..a63bde6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -21,7 +21,13 @@ import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -31,20 +37,25 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -450,4 +461,277 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
assertTrue("Table Diff Contains " + tableDiffEntries,
tableDiffEntries.containsAll(Arrays.asList("t1_managed", "t2_managed")));
}
+
+ @Test
+ public void testTargetEventIdGenerationAfterFirstIncremental() throws Throwable {
+ List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some table & do an incremental dump.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table table1 (id int)")
+ .run("insert into table table1 values (100)")
+ .run("create table table1_managed (name string)")
+ .run("insert into table table1_managed values ('ABC')")
+ .dump(primaryDbName, withClause);
+
+ // Do an incremental load
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Get the latest notification from the notification log for the target database, just after replication.
+ CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId();
+
+ // Check the tables are there post incremental load.
+ replica.run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("select id from table1")
+ .verifyResult("100")
+ .run("select name from table1_managed")
+ .verifyResult("ABC")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do some modifications on the source cluster, so we have some entries in the table diff.
+ primary.run("use " + primaryDbName)
+ .run("create table table2_managed (id string)")
+ .run("insert into table table1_managed values ('SDC')")
+ .run("insert into table table2_managed values ('A'),('B'),('C')");
+
+
+ // Do some modifications in another database to have unrelated events as well after the last load, which should
+ // get filtered.
+
+ primary.run("create database " + extraPrimaryDb)
+ .run("use " + extraPrimaryDb)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (15),(1),(96)")
+ .run("create table t1_managed (id string)")
+ .run("insert into table t1_managed values ('SA'),('PS')");
+
+ // Do some modifications on the target database.
+ replica.run("use " + replicatedDbName)
+ .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key1'='value1')")
+ .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key2'='value2')");
+
+ // Validate the current replication id on original target has changed now.
+ assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId());
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "reverse1");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+ tuple = replica.dump(replicatedDbName);
+
+ // Check event ack file should get created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Get the target event id.
+ NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
+ .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), -1,
+ new DatabaseAndTableFilter(replicatedDbName, null));
+
+ // There should be 4 events, one for alter db, second to remove first incremental pending and then two custom
+ // alter operations.
+ assertEquals(4, nl.getEvents().size());
+ }
+
+ @Test
+ public void testTargetEventIdGeneration() throws Throwable {
+ // Do a a cycle of bootstrap dump & load.
+ List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some table & do the first incremental dump.
+ primary.run("use " + primaryDbName)
+ .run("create external table tablei1 (id int)")
+ .run("create external table tablei2 (id int)")
+ .run("create table tablem1 (id int)")
+ .run("create table tablem2 (id int)")
+ .run("insert into table tablei1 values(1),(2),(3),(4)")
+ .run("insert into table tablei2 values(10),(20),(30),(40)")
+ .run("insert into table tablem1 values(5),(10),(15),(20)")
+ .run("insert into table tablem2 values(6),(12),(18),(24)")
+ .dump(primaryDbName, withClause);
+
+ // Do the incremental load, and check everything is intact.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use "+ replicatedDbName)
+ .run("select id from tablei1")
+ .verifyResults(new String[]{"1","2","3","4"})
+ .run("select id from tablei2")
+ .verifyResults(new String[]{"10","20","30","40"})
+ .run("select id from tablem1")
+ .verifyResults(new String[]{"5","10","15","20"})
+ .run("select id from tablem2")
+ .verifyResults(new String[]{"6","12","18","24"});
+
+ // Do some modifications & call for the second cycle of incremental dump & load.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table table1 (id int)")
+ .run("insert into table table1 values (25),(35),(82)")
+ .run("create table table1_managed (name string)")
+ .run("insert into table table1_managed values ('CAD'),('DAS'),('MSA')")
+ .run("insert into table tablei1 values(15),(62),(25),(62)")
+ .run("insert into table tablei2 values(10),(22),(11),(22)")
+ .run("insert into table tablem1 values(5),(10),(15),(20)")
+ .run("alter table table1 set TBLPROPERTIES('comment'='abc')")
+ .dump(primaryDbName, withClause);
+
+ // Do an incremental load
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Get the latest notification from the notification log for the target database, just after replication.
+ CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId();
+
+ // Check the tables are there post incremental load.
+ replica.run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("select id from table1")
+ .verifyResults(new String[]{"25", "35", "82"})
+ .run("select name from table1_managed")
+ .verifyResults(new String[]{"CAD", "DAS", "MSA"})
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do some modifications on the source cluster, so we have some entries in the table diff.
+ primary.run("use " + primaryDbName)
+ .run("create table table2_managed (id string)")
+ .run("insert into table table1_managed values ('AAA'),('BBB')")
+ .run("insert into table table2_managed values ('A1'),('B1'),('C2')");
+
+
+ // Do some modifications in another database to have unrelated events as well after the last load, which should
+ // get filtered.
+
+ primary.run("create database " + extraPrimaryDb)
+ .run("use " + extraPrimaryDb)
+ .run("create external table table1 (id int)")
+ .run("insert into table table1 values (15),(1),(96)")
+ .run("create table table1_managed (id string)")
+ .run("insert into table table1_managed values ('SAA'),('PSA')");
+
+ // Do some modifications on the target database.
+ replica.run("use " + replicatedDbName)
+ .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl1'='value1')")
+ .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl2'='value2')");
+
+ // Validate the current replication id on original target has changed now.
+ assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId());
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "reverse01");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check event ack file should get created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Get the target event id.
+ NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
+ .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), 10,
+ new DatabaseAndTableFilter(replicatedDbName, null));
+
+ assertEquals(1, nl.getEvents().size());
+ }
+
+ @Test
+ public void testTargetEventIdWithNotificationsExpired() throws Throwable {
+ // Do a a cycle of bootstrap dump & load.
+ List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some table & do the first incremental dump.
+ primary.run("use " + primaryDbName)
+ .run("create external table tablei1 (id int)")
+ .run("create table tablem1 (id int)")
+ .run("insert into table tablei1 values(1),(2),(3),(4)")
+ .run("insert into table tablem1 values(5),(10),(15),(20)")
+ .dump(primaryDbName, withClause);
+
+ // Do the incremental load, and check everything is intact.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use "+ replicatedDbName)
+ .run("select id from tablei1")
+ .verifyResults(new String[]{"1","2","3","4"})
+ .run("select id from tablem1")
+ .verifyResults(new String[]{"5","10","15","20"});
+
+ // Explicitly make the notification logs.
+ // Get the latest notification from the notification log for the target database, just after replication.
+ CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId();
+ // Inject a behaviour where some events missing from notification_log table.
+ // This ensures the incremental dump doesn't get all events for replication.
+ InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse>
+ eventIdSkipper =
+ new InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse>() {
+
+ @Nullable
+ @Override
+ public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) {
+ if (null != eventIdList) {
+ List<NotificationEvent> eventIds = eventIdList.getEvents();
+ List<NotificationEvent> outEventIds = new ArrayList<>();
+ for (NotificationEvent event : eventIds) {
+ // Skip the last db event.
+ if (event.getDbName().equalsIgnoreCase(replicatedDbName)) {
+ injectionPathCalled = true;
+ continue;
+ }
+ outEventIds.add(event);
+ }
+
+ // Return the new list
+ return new NotificationEventResponse(outEventIds);
+ } else {
+ return null;
+ }
+ }
+ };
+
+ try {
+ InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper);
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "reverse01");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+ try {
+ replica.dump(replicatedDbName, withClause);
+ fail("Expected the dump to fail since the notification event is missing.");
+ } catch (Exception e) {
+ // Expected due to missing notification log entry.
+ }
+
+ // Check if there is a non-recoverable error or not.
+ Path nonRecoverablePath =
+ TestReplicationScenarios.getNonRecoverablePath(newReplDir, replicatedDbName, replica.hiveConf);
+ assertTrue(replicaFs.exists(nonRecoverablePath));
+ } finally {
+ InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour
+ }
+ }
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 61f61a8..b91191e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -81,7 +81,6 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.StringAppender;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector;
@@ -132,9 +131,6 @@ import java.util.Base64;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
@@ -4884,7 +4880,7 @@ public class TestReplicationScenarios {
assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(lastDbReplDumpId));
Table tbl = metaStoreClientMirror.getTable(replDbName, tblName);
- String tblLastReplId = tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ String tblLastReplId = tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
assertTrue(Long.parseLong(tblLastReplId) > Long.parseLong(lastDbReplDumpId));
assertTrue(Long.parseLong(tblLastReplId) <= Long.parseLong(lastReplDumpId));
return lastReplDumpId;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index b3ddbf5..2612cbf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -934,13 +934,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assert currentEventId == lastEventId + 1;
primary.run("ALTER DATABASE " + primaryDbName +
- " SET DBPROPERTIES('" + ReplConst.TARGET_OF_REPLICATION + "'='true')");
- lastEventId = primary.getCurrentNotificationEventId().getEventId();
- primary.dumpFailure(primaryDbName);
- currentEventId = primary.getCurrentNotificationEventId().getEventId();
- assert lastEventId == currentEventId;
-
- primary.run("ALTER DATABASE " + primaryDbName +
" SET DBPROPERTIES('" + ReplConst.TARGET_OF_REPLICATION + "'='')");
primary.dump(primaryDbName);
replica.run("DROP DATABASE " + replicatedDbName);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
index 85bbbec..8221a51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -50,6 +50,7 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getTargetLastReplicatedStateFromParameters;
/**
* Utility class for handling operations regarding optimised bootstrap in case of replication.
@@ -58,7 +59,7 @@ public class OptimisedBootstrapUtils {
/** Separator used to separate entries in the listing. */
public static final String FILE_ENTRY_SEPARATOR = "#";
- private static Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
+ private static final Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
/** table diff directory when in progress */
public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff";
@@ -87,20 +88,20 @@ public class OptimisedBootstrapUtils {
}
/**
- * Gets the event id from the event ack file
+ * Gets the source & target event id from the event ack file
* @param dumpPath the dump path
* @param conf the hive configuration
* @return the event id from file.
* @throws IOException
*/
- public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException {
+ public static String[] getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException {
String lastEventId;
Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE);
FileSystem fs = eventAckFilePath.getFileSystem(conf);
try (FSDataInputStream stream = fs.open(eventAckFilePath);) {
lastEventId = IOUtils.toString(stream, Charset.defaultCharset());
}
- return lastEventId.replaceAll(System.lineSeparator(),"").trim();
+ return lastEventId.replaceAll(System.lineSeparator(), "").trim().split(FILE_ENTRY_SEPARATOR);
}
/**
@@ -176,19 +177,20 @@ public class OptimisedBootstrapUtils {
* @param dmd the dump metadata
* @param cmRoot the cmRoot
* @param dbEventId the database event id to which we have to write in the file.
- * @param conf the hive configuraiton
+ * @param targetDbEventId the database event id with respect to target cluster.
+ * @param conf the hive configuration
* @param work the repldump work
* @return the lastReplId denoting a fake dump(-1) always
* @throws SemanticException
*/
public static Long createAndGetEventAckFile(Path currentDumpPath, DumpMetaData dmd, Path cmRoot, String dbEventId,
- HiveConf conf, ReplDumpWork work)
- throws SemanticException {
+ String targetDbEventId, HiveConf conf, ReplDumpWork work) throws Exception {
// Keep an invalid value for lastReplId, to denote it isn't a actual dump.
Long lastReplId = -1L;
Path filePath = new Path(currentDumpPath, EVENT_ACK_FILE);
- Utils.writeOutput(dbEventId, filePath, conf);
- LOG.info("Created event_ack file at {} with eventId {}", filePath, dbEventId);
+ Utils.writeOutput(dbEventId + FILE_ENTRY_SEPARATOR + targetDbEventId, filePath, conf);
+ LOG.info("Created event_ack file at {} with source eventId {} and target eventId {}", filePath, dbEventId,
+ targetDbEventId);
work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, -1L, false);
dmd.write(true);
@@ -245,6 +247,24 @@ public class OptimisedBootstrapUtils {
LOG.info("Completed renaming table diff progress file to table diff complete file.");
}
+ /**
+ * Fetches the notification id from the database with respect to target database.
+ * @param dbName name of database
+ * @param hiveDb the hive object
+ * @return the corresponding notification event id from target database
+ * @throws Exception
+ */
+ public static String getTargetEventId(String dbName, Hive hiveDb) throws Exception {
+ Database database = hiveDb.getDatabase(dbName);
+ String targetLastEventId = getTargetLastReplicatedStateFromParameters(database.getParameters());
+ List<NotificationEvent> events =
+ hiveDb.getMSC().getNextNotification(Long.parseLong(targetLastEventId) - 1, 1, null).getEvents();
+ if (events == null || events.isEmpty() || events.get(0).getEventId() != Long.parseLong(targetLastEventId)) {
+ throw new IllegalStateException("Notification events are missing in the meta store.");
+ }
+ return targetLastEventId;
+ }
+
private static ArrayList<String> getListing(String dbName, String tableName, Hive hiveDb, HiveConf conf)
throws HiveException, IOException {
ArrayList<String> paths = new ArrayList<>();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index a9d728d..1fcce45 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -131,6 +131,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkF
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.createAndGetEventAckFile;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getReplEventIdFromDatabase;
+import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTargetEventId;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFailover;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirstIncrementalPending;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
@@ -229,11 +230,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
isFirstIncrementalPending(work.dbNameOrPattern, getHive());
// Get the last replicated event id from the database.
String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive());
+ // Get the last replicated event id from the database with respect to target.
+ String targetDbEventId = getTargetEventId(work.dbNameOrPattern, getHive());
// Check if the tableDiff directory is present or not.
boolean isTableDiffDirectoryPresent = checkFileExists(currentDumpPath, conf, TABLE_DIFF_COMPLETE_DIRECTORY);
if (createEventMarker) {
LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId);
- lastReplId = createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, conf, work);
+ lastReplId =
+ createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, targetDbEventId, conf, work);
finishRemainingTasks();
} else {
// We should be here only if TableDiff is Present.
@@ -548,7 +552,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return true;
}
// Event_ACK file is present check if it contains correct value or not.
- String fileEventId = getEventIdFromFile(previousDumpPath.getParent(), conf);
+ String fileEventId = getEventIdFromFile(previousDumpPath.getParent(), conf)[0];
String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive()).trim();
if (!dbEventId.equalsIgnoreCase(fileEventId)) {
// In case the database event id changed post table_diff_complete generation, that means both forward &
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 3a2dda9..2f249aa7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -598,7 +598,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
Map<String, String> dbProps;
if (work.isIncrementalLoad()) {
dbProps = new HashMap<>();
- dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
work.incrementalLoadTasksBuilder().eventTo().toString());
} else {
Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
@@ -699,7 +699,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
}
boolean isTableDiffPresent =
checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY);
- Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf));
+ Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]);
if (!isTableDiffPresent) {
prepareTableDiffFile(eventId, getHive(), work, conf);
if (this.childTasks == null) {
@@ -761,7 +761,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
if (StringUtils.isNotBlank(dbName)) {
String lastEventid = builder.eventTo().toString();
Map<String, String> mapProp = new HashMap<>();
- mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid);
+ mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), lastEventid);
AlterDatabaseSetPropertiesDesc alterDbDesc =
new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
new ReplicationSpec(lastEventid, lastEventid));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index ba7979d..06264f5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
public class LoadDatabase {
@@ -141,8 +140,8 @@ public class LoadDatabase {
private boolean isDbAlreadyBootstrapped(Database db) {
Map<String, String> props = db.getParameters();
return ((props != null)
- && props.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString())
- && !props.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()).isEmpty());
+ && props.containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString())
+ && !props.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()).isEmpty());
}
private boolean isDbEmpty(String dbName) throws HiveException {
@@ -181,7 +180,9 @@ public class LoadDatabase {
last repl id is set and we create a AlterDatabaseTask at the end of processing a database.
*/
Map<String, String> parameters = new HashMap<>(dbObj.getParameters());
- parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
+
+ parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID_TARGET.toString());
parameters.remove(ReplUtils.REPL_IS_CUSTOM_DB_LOC);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index 0eabc1c..c9f0da4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.slf4j.Logger;
-import org.stringtemplate.v4.ST;
import java.util.ArrayList;
import java.util.HashMap;
@@ -177,7 +176,7 @@ public class IncrementalLoadTasksBuilder {
taskChainTail = updateIncPendTask;
Map<String, String> dbProps = new HashMap<>();
- dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent));
+ dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), String.valueOf(lastReplayedEvent));
ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps, dumpDirectory,
metricCollector, shouldFailover);
Task<?> barrierTask = TaskFactory.get(replStateLogWork, conf);
@@ -196,8 +195,8 @@ public class IncrementalLoadTasksBuilder {
}
private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
- if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
- String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()))) {
+ String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
log.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
+ " is already replayed. LastReplId - " + Long.parseLong(replLastId));
@@ -242,7 +241,7 @@ public class IncrementalLoadTasksBuilder {
private Task<?> dbUpdateReplStateTask(String dbName, String replState, Task<?> preCursor) {
HashMap<String, String> mapProp = new HashMap<>();
- mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+ mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), replState);
AlterDatabaseSetPropertiesDesc alterDbDesc = new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
new ReplicationSpec(replState, replState));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 18cdab5..816153e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -841,7 +841,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
} else {
// verify if table has been the target of replication, and if so, check HiveConf if we're allowed
// to override. If not, fail.
- if (table.getParameters().containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString())
+ if (table.getParameters().containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString())
&& conf.getBoolVar(HiveConf.ConfVars.HIVE_EXIM_RESTRICT_IMPORTS_INTO_REPLICATED_TABLES)){
throw new SemanticException(ErrorMsg.IMPORT_INTO_STRICT_REPL_TABLE.getMsg(
"Table "+table.getTableName()+" has repl.last.id parameter set." ));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 0b9eb57..17472c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -55,7 +55,6 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
-import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD;
@@ -445,8 +444,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
if (database != null) {
inputs.add(new ReadEntity(database));
Map<String, String> params = database.getParameters();
- if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
- return params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()))) {
+ return params.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
}
}
} catch (HiveException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 37039c4..dd2224c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -18,14 +18,13 @@
package org.apache.hadoop.hive.ql.parse;
import com.google.common.base.Function;
-import com.google.common.base.Predicate;
import org.apache.hadoop.hive.common.repl.ReplConst;
-import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import javax.annotation.Nullable;
import java.util.Map;
+import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_TARGET_DATABASE_PROPERTY;
+
/**
* Statements executed to handle replication have some additional
* information relevant to the replication subsystem - this class
@@ -59,11 +58,12 @@ public class ReplicationSpec {
public enum KEY {
REPL_SCOPE("repl.scope"),
EVENT_ID("repl.event.id"),
- CURR_STATE_ID(ReplConst.REPL_TARGET_TABLE_PROPERTY),
+ CURR_STATE_ID_SOURCE(ReplConst.REPL_TARGET_TABLE_PROPERTY),
NOOP("repl.noop"),
IS_REPLACE("repl.is.replace"),
VALID_WRITEID_LIST("repl.valid.writeid.list"),
- VALID_TXN_LIST("repl.valid.txnid.list")
+ VALID_TXN_LIST("repl.valid.txnid.list"),
+ CURR_STATE_ID_TARGET(REPL_TARGET_DATABASE_PROPERTY),
;
private final String keyName;
@@ -146,7 +146,7 @@ public class ReplicationSpec {
}
}
this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString());
- this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString()));
this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString());
@@ -230,8 +230,15 @@ public class ReplicationSpec {
}
public static String getLastReplicatedStateFromParameters(Map<String, String> m) {
- if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID.toString()))){
- return m.get(KEY.CURR_STATE_ID.toString());
+ if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID_SOURCE.toString()))){
+ return m.get(KEY.CURR_STATE_ID_SOURCE.toString());
+ }
+ return null;
+ }
+
+ public static String getTargetLastReplicatedStateFromParameters(Map<String, String> m) {
+ if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID_TARGET.toString()))){
+ return m.get(KEY.CURR_STATE_ID_TARGET.toString());
}
return null;
}
@@ -360,7 +367,7 @@ public class ReplicationSpec {
}
case EVENT_ID:
return getReplicationState();
- case CURR_STATE_ID:
+ case CURR_STATE_ID_SOURCE:
return getCurrentReplicationState();
case NOOP:
return String.valueOf(isNoop());
@@ -388,9 +395,9 @@ public class ReplicationSpec {
public static void copyLastReplId(Map<String, String> srcParameter, Map<String, String> destParameter) {
- String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString());
if (lastReplId != null) {
- destParameter.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastReplId);
+ destParameter.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), lastReplId);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
index 06b8fd5..429b9d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java
@@ -39,7 +39,7 @@ public class DBSerializer implements JsonWriter.Serializer {
public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
throws SemanticException, IOException {
dbObject.putToParameters(
- ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
additionalPropertiesProvider.getCurrentReplicationState()
);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
index c340df7..fc01906 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -94,7 +94,7 @@ public class FunctionSerializer implements JsonWriter.Serializer {
//This is required otherwise correct work object on repl load wont be created.
writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.REPL_SCOPE.toString(),
"all");
- writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
additionalPropertiesProvider.getCurrentReplicationState());
writer.jsonGenerator
.writeStringField(FIELD_NAME, serializer.toString(copyObj));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index 12c183e..d198612 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -57,7 +57,7 @@ public class PartitionSerializer implements JsonWriter.Serializer {
if (additionalPropertiesProvider.getReplSpecType()
!= ReplicationSpec.Type.INCREMENTAL_DUMP) {
partition.putToParameters(
- ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
additionalPropertiesProvider.getCurrentReplicationState());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 16c371a..8c52194 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -86,7 +86,7 @@ public class TableSerializer implements JsonWriter.Serializer {
if (additionalPropertiesProvider.getReplSpecType()
!= ReplicationSpec.Type.INCREMENTAL_DUMP) {
table.putToParameters(
- ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(),
additionalPropertiesProvider.getCurrentReplicationState());
}
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
index 65841c3..2194f39 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
@@ -64,7 +64,8 @@ public class AlterDatabaseHandler extends AbstractMessageHandler {
String key = entry.getKey();
// Ignore the keys which are local to source warehouse
if (key.startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
- || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString())
+ || key.equals(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString())
+ || key.equals(ReplicationSpec.KEY.CURR_STATE_ID_TARGET.toString())
|| key.equals(ReplUtils.REPL_CHECKPOINT_KEY)
|| key.equals(ReplChangeManager.SOURCE_OF_REPLICATION)
|| key.equals(ReplUtils.REPL_FIRST_INC_PENDING_FLAG)
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
index 46dbc3e..f5131ca 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
@@ -53,4 +53,9 @@ public class ReplConst {
public static final String TARGET_OF_REPLICATION = "repl.target.for";
public static final String REPL_INCOMPATIBLE = "repl.incompatible";
+
+ /**
+ * Tracks the event id with respect to the target cluster.
+ */
+ public static final String REPL_TARGET_DATABASE_PROPERTY = "repl.target.last.id";
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index d186c4f..d8a3da4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -110,6 +110,7 @@ import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFI
import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
import static org.apache.hadoop.hive.common.AcidConstants.DELTA_DIGITS;
+import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_TARGET_DATABASE_PROPERTY;
import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS;
import static org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
@@ -1581,6 +1582,16 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
throw new MetaException("Could not alter database \"" + parsedDbName[DB_NAME] +
"\". Could not retrieve old definition.");
}
+
+ // Add replication target event id.
+ if (isReplicationEventIdUpdate(oldDB, newDB)) {
+ Map<String, String> oldParams = new LinkedHashMap<>(newDB.getParameters());
+ String currentNotificationLogID = Long.toString(ms.getCurrentNotificationEventId().getEventId());
+ oldParams.put(REPL_TARGET_DATABASE_PROPERTY, currentNotificationLogID);
+ LOG.debug("Adding the {} property for database {} with event id {}", REPL_TARGET_DATABASE_PROPERTY,
+ newDB.getName(), currentNotificationLogID);
+ newDB.setParameters(oldParams);
+ }
firePreEvent(new PreAlterDatabaseEvent(oldDB, newDB, this));
ms.openTransaction();
@@ -1613,6 +1624,23 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
}
}
+ /**
+ * Checks whether the repl.last.id is being updated.
+ * @param oldDb the old db object
+ * @param newDb the new db object
+ * @return true if repl.last.id is being changed.
+ */
+ private boolean isReplicationEventIdUpdate(Database oldDb, Database newDb) {
+ Map<String, String> oldDbProp = oldDb.getParameters();
+ Map<String, String> newDbProp = newDb.getParameters();
+ if (newDbProp == null || newDbProp.isEmpty()) {
+ return false;
+ }
+ String newReplId = newDbProp.get(ReplConst.REPL_TARGET_TABLE_PROPERTY);
+ String oldReplId = oldDbProp != null ? oldDbProp.get(ReplConst.REPL_TARGET_TABLE_PROPERTY) : null;
+ return newReplId != null && !newReplId.equalsIgnoreCase(oldReplId);
+ }
+
private void drop_database_core(RawStore ms, String catName,
final String name, final boolean deleteData, final boolean cascade)
throws NoSuchObjectException, InvalidOperationException, MetaException,