You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/04/20 08:51:13 UTC

[GitHub] [hive] aasha commented on a change in pull request #977: HIVE-23040 : Checkpointing for repl dump incremental phase

aasha commented on a change in pull request #977:
URL: https://github.com/apache/hive/pull/977#discussion_r411194373



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -663,6 +669,322 @@ public void testMultiDBTxn() throws Throwable {
     }
   }
 
+  @Test
+  public void testIncrementalDumpCheckpointing() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {});
+
+
+    //Case 1: When the last dump finished all the events and
+    //only  _finished_dump file at the hiveDumpRoot was about to be written when it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);

Review comment:
       Why is this needed?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -663,6 +669,322 @@ public void testMultiDBTxn() throws Throwable {
     }
   }
 
+  @Test
+  public void testIncrementalDumpCheckpointing() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {});
+
+
+    //Case 1: When the last dump finished all the events and
+    //only  _finished_dump file at the hiveDumpRoot was about to be written when it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName)
+            .run("insert into t1 values (1)")
+            .run("insert into t2 values (2)")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString());
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+
+    Map<String, Long> eventModTimeMap = new HashMap<>();
+    long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1;
+    long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
+    assertTrue(lastIncEventID > (firstIncEventID + 1));
+
+    for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) {
+      Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
+      if (fs.exists(eventRoot)) {
+        eventModTimeMap.put(String.valueOf(eventId), fs.getFileStatus(eventRoot).getModificationTime());

Review comment:
       Should check the individual files. The root level directory ts mayn't change even if contents are changing. 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -663,6 +669,322 @@ public void testMultiDBTxn() throws Throwable {
     }
   }
 
+  @Test
+  public void testIncrementalDumpCheckpointing() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {});
+
+
+    //Case 1: When the last dump finished all the events and
+    //only  _finished_dump file at the hiveDumpRoot was about to be written when it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName)
+            .run("insert into t1 values (1)")
+            .run("insert into t2 values (2)")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString());
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+
+    Map<String, Long> eventModTimeMap = new HashMap<>();
+    long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1;
+    long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
+    assertTrue(lastIncEventID > (firstIncEventID + 1));
+
+    for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) {
+      Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
+      if (fs.exists(eventRoot)) {
+        eventModTimeMap.put(String.valueOf(eventId), fs.getFileStatus(eventRoot).getModificationTime());
+      }
+    }
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName)
+            .dump(primaryDbName);
+    assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
+    assertTrue(fs.exists(ackFile));
+    //check events were not rewritten.
+    for(Map.Entry<String, Long> entry :eventModTimeMap.entrySet()) {
+      long oldModTime = entry.getValue();
+      long newModTime = fs.getFileStatus(new Path(hiveDumpDir, entry.getKey())).getModificationTime();
+      assertEquals(oldModTime, newModTime);
+    }
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"2"});
+
+
+    //Case 2: When the last dump was half way through
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + primaryDbName)
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (4)")
+            .dump(primaryDbName);
+
+    hiveDumpDir = new Path(incrementalDump3.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString());
+    fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+    //delete last three events and test if it recovers.
+    long lastEventID = Long.parseLong(incrementalDump3.lastReplicationId);
+    Path lastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID));
+    Path secondLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 1));
+    Path thirdLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 2));
+    assertTrue(fs.exists(lastEvtRoot));
+    assertTrue(fs.exists(secondLastEvtRoot));
+    assertTrue(fs.exists(thirdLastEvtRoot));
+
+    long lastEvtModTimeOld = fs.getFileStatus(lastEvtRoot).getModificationTime();
+    long secondLastEvtModTimeOld = fs.getFileStatus(secondLastEvtRoot).getModificationTime();
+    long thirdLastEvtModTimeOld = fs.getFileStatus(thirdLastEvtRoot).getModificationTime();
+
+    fs.delete(lastEvtRoot, true);
+    fs.delete(secondLastEvtRoot, true);
+    fs.delete(thirdLastEvtRoot, true);
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
+            Arrays.asList(
+                    LAST_EVENT_ID_NAME,
+                    String.valueOf(lastEventID - 3)
+            )
+    );
+    org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(listValues, ackLastEventID, primary.hiveConf, true);
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+
+    WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + primaryDbName)
+            .dump(primaryDbName);
+
+    assertEquals(incrementalDump3.dumpLocation, incrementalDump4.dumpLocation);
+
+    verifyPathExist(fs, ackFile);
+    verifyPathExist(fs, ackLastEventID);
+    verifyPathExist(fs, lastEvtRoot);
+    verifyPathExist(fs, secondLastEvtRoot);
+    verifyPathExist(fs, thirdLastEvtRoot);
+    assertTrue(fs.getFileStatus(lastEvtRoot).getModificationTime() > lastEvtModTimeOld);
+    assertTrue(fs.getFileStatus(secondLastEvtRoot).getModificationTime() > secondLastEvtModTimeOld);
+    assertTrue(fs.getFileStatus(thirdLastEvtRoot).getModificationTime() > thirdLastEvtModTimeOld);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "3"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"2", "4"});
+  }
+
+  @Test

Review comment:
       Add tests for delete/drop/add new table during checkpointing

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -731,9 +826,17 @@ private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) {
     }
   }
 
-  private boolean shouldResumePreviousDump(Path dumpPath) {
-    Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
-    return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf));
+  private boolean shouldResumePreviousDump(Path lastDumpPath, Path previousValidHiveDumpPath) throws IOException {
+    if (validDump(lastDumpPath)) {
+      return false;
+    }
+    // In case of incremental _dumpmetadata may not be present, still we should resume.
+    if (previousValidHiveDumpPath != null) {

Review comment:
       We should check if a previous invalid hive dump exists. If you check for previousValidHiveDumpPath, that may be true even if previous dump path was not deleted.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -731,9 +826,17 @@ private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) {
     }
   }
 
-  private boolean shouldResumePreviousDump(Path dumpPath) {
-    Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
-    return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf));
+  private boolean shouldResumePreviousDump(Path lastDumpPath, Path previousValidHiveDumpPath) throws IOException {
+    if (validDump(lastDumpPath)) {
+      return false;
+    }
+    // In case of incremental _dumpmetadata may not be present, still we should resume.
+    if (previousValidHiveDumpPath != null) {

Review comment:
       Tests for this flow

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -33,6 +33,8 @@
 import org.apache.hadoop.hive.ql.DriverFactory;

Review comment:
       Add tests for both distcp and regular copy

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -742,7 +845,7 @@ long currentNotificationId(Hive hiveDb) throws TException {
 
   Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) throws Exception {
     // TODO : instantiating FS objects are generally costly. Refactor
-    Path dbRoot = getBootstrapDbRoot(metadataRoot, dbName, false);

Review comment:
       This was different for bootstrap duing incremental flow
   `if (isIncrementalPhase) {
         dumpRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
       }`
   How is the path now?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -94,6 +94,12 @@
   // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be
   // seen in production or in case of tests other than the ones where it's required.
   public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables";
+
+  // HDFS Config to define the maximum number of items a directory may contain.
+  public static final String DFS_MAX_DIR_ITEMS_CONFIG = "dfs.namenode.fs-limits.max-directory-items";

Review comment:
       all other configs use . as separator instead of -

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -33,6 +33,8 @@
 import org.apache.hadoop.hive.ql.DriverFactory;

Review comment:
       Add tests for partial events not dumped

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
##########
@@ -100,6 +105,20 @@ public Void execute() throws IOException {
     }
   }
 
+  public static boolean create(Path outputFile, HiveConf hiveConf, boolean replace)

Review comment:
       The replace param is unused

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
##########
@@ -76,6 +76,7 @@
   public static final String FILES_NAME = "_files";
   public static final String DATA_PATH_NAME = "data";
   public static final String METADATA_PATH_NAME = "metadata";
+  public static final String LAST_EVENT_ID_NAME = "LastEventID";

Review comment:
       lastEventId

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
##########
@@ -64,10 +64,15 @@
 
   public static void writeOutput(List<List<String>> listValues, Path outputFile, HiveConf hiveConf)
       throws SemanticException {
+    writeOutput(listValues, outputFile, hiveConf, false);

Review comment:
       Better to add FS Retry




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org