You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/04/20 14:45:09 UTC

[GitHub] [hive] pkumarsinha commented on a change in pull request #977: HIVE-23040 : Checkpointing for repl dump incremental phase

pkumarsinha commented on a change in pull request #977:
URL: https://github.com/apache/hive/pull/977#discussion_r411249656



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -33,6 +33,8 @@
 import org.apache.hadoop.hive.ql.DriverFactory;

Review comment:
       Can not add regular copy one, until HIVE-23235 is fixed. 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -33,6 +33,8 @@
 import org.apache.hadoop.hive.ql.DriverFactory;

Review comment:
       testIncrementalDumpCheckpointing() checks for partial failure cases.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -663,6 +669,322 @@ public void testMultiDBTxn() throws Throwable {
     }
   }
 
+  @Test
+  public void testIncrementalDumpCheckpointing() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {});
+
+
+    //Case 1: When the last dump finished all the events and
+    //only  _finished_dump file at the hiveDumpRoot was about to be written when it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);

Review comment:
       This prevents the older valid dump directory from getting deleted to mimic the case when the current dump fails between 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -663,6 +669,322 @@ public void testMultiDBTxn() throws Throwable {
     }
   }
 
+  @Test
+  public void testIncrementalDumpCheckpointing() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {});
+
+
+    //Case 1: When the last dump finished all the events and
+    //only  _finished_dump file at the hiveDumpRoot was about to be written when it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName)
+            .run("insert into t1 values (1)")
+            .run("insert into t2 values (2)")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString());
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+
+    Map<String, Long> eventModTimeMap = new HashMap<>();
+    long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1;
+    long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
+    assertTrue(lastIncEventID > (firstIncEventID + 1));
+
+    for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) {
+      Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
+      if (fs.exists(eventRoot)) {
+        eventModTimeMap.put(String.valueOf(eventId), fs.getFileStatus(eventRoot).getModificationTime());
+      }
+    }
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName)
+            .dump(primaryDbName);
+    assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
+    assertTrue(fs.exists(ackFile));
+    //check events were not rewritten.
+    for(Map.Entry<String, Long> entry :eventModTimeMap.entrySet()) {
+      long oldModTime = entry.getValue();
+      long newModTime = fs.getFileStatus(new Path(hiveDumpDir, entry.getKey())).getModificationTime();
+      assertEquals(oldModTime, newModTime);
+    }
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"2"});
+
+
+    //Case 2: When the last dump was half way through
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + primaryDbName)
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (4)")
+            .dump(primaryDbName);
+
+    hiveDumpDir = new Path(incrementalDump3.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString());
+    fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+    //delete last three events and test if it recovers.
+    long lastEventID = Long.parseLong(incrementalDump3.lastReplicationId);
+    Path lastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID));
+    Path secondLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 1));
+    Path thirdLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 2));
+    assertTrue(fs.exists(lastEvtRoot));
+    assertTrue(fs.exists(secondLastEvtRoot));
+    assertTrue(fs.exists(thirdLastEvtRoot));
+
+    long lastEvtModTimeOld = fs.getFileStatus(lastEvtRoot).getModificationTime();
+    long secondLastEvtModTimeOld = fs.getFileStatus(secondLastEvtRoot).getModificationTime();
+    long thirdLastEvtModTimeOld = fs.getFileStatus(thirdLastEvtRoot).getModificationTime();
+
+    fs.delete(lastEvtRoot, true);
+    fs.delete(secondLastEvtRoot, true);
+    fs.delete(thirdLastEvtRoot, true);
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
+            Arrays.asList(
+                    LAST_EVENT_ID_NAME,
+                    String.valueOf(lastEventID - 3)
+            )
+    );
+    org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(listValues, ackLastEventID, primary.hiveConf, true);
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+
+    WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + primaryDbName)
+            .dump(primaryDbName);
+
+    assertEquals(incrementalDump3.dumpLocation, incrementalDump4.dumpLocation);
+
+    verifyPathExist(fs, ackFile);
+    verifyPathExist(fs, ackLastEventID);
+    verifyPathExist(fs, lastEvtRoot);
+    verifyPathExist(fs, secondLastEvtRoot);
+    verifyPathExist(fs, thirdLastEvtRoot);
+    assertTrue(fs.getFileStatus(lastEvtRoot).getModificationTime() > lastEvtModTimeOld);
+    assertTrue(fs.getFileStatus(secondLastEvtRoot).getModificationTime() > secondLastEvtModTimeOld);
+    assertTrue(fs.getFileStatus(thirdLastEvtRoot).getModificationTime() > thirdLastEvtModTimeOld);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "3"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"2", "4"});
+  }
+
+  @Test

Review comment:
       Will add.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -663,6 +669,322 @@ public void testMultiDBTxn() throws Throwable {
     }
   }
 
+  @Test
+  public void testIncrementalDumpCheckpointing() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {});
+
+
+    //Case 1: When the last dump finished all the events and
+    //only  _finished_dump file at the hiveDumpRoot was about to be written when it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName)
+            .run("insert into t1 values (1)")
+            .run("insert into t2 values (2)")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString());
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+
+    Map<String, Long> eventModTimeMap = new HashMap<>();
+    long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1;
+    long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
+    assertTrue(lastIncEventID > (firstIncEventID + 1));
+
+    for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) {
+      Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
+      if (fs.exists(eventRoot)) {
+        eventModTimeMap.put(String.valueOf(eventId), fs.getFileStatus(eventRoot).getModificationTime());

Review comment:
       The top level directory also gets updated as distcp preserves modification time 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -94,6 +94,12 @@
   // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be
   // seen in production or in case of tests other than the ones where it's required.
   public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables";
+
+  // HDFS Config to define the maximum number of items a directory may contain.
+  public static final String DFS_MAX_DIR_ITEMS_CONFIG = "dfs.namenode.fs-limits.max-directory-items";

Review comment:
       This is HDFS config.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -742,7 +845,7 @@ long currentNotificationId(Hive hiveDb) throws TException {
 
   Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) throws Exception {
     // TODO : instantiating FS objects are generally costly. Refactor
-    Path dbRoot = getBootstrapDbRoot(metadataRoot, dbName, false);

Review comment:
       same as normal bootstrap, just that _bootstrap is base in stead of hive. i.e replDir/hive/_bootstrap/meta/db/table

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
##########
@@ -100,6 +105,20 @@ public Void execute() throws IOException {
     }
   }
 
+  public static boolean create(Path outputFile, HiveConf hiveConf, boolean replace)

Review comment:
       Will fix

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
##########
@@ -64,10 +64,15 @@
 
   public static void writeOutput(List<List<String>> listValues, Path outputFile, HiveConf hiveConf)
       throws SemanticException {
+    writeOutput(listValues, outputFile, hiveConf, false);

Review comment:
       Yes, we can add. Just that, it  will impact the existing uses of this method.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -731,9 +826,17 @@ private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) {
     }
   }
 
-  private boolean shouldResumePreviousDump(Path dumpPath) {
-    Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
-    return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf));
+  private boolean shouldResumePreviousDump(Path lastDumpPath, Path previousValidHiveDumpPath) throws IOException {
+    if (validDump(lastDumpPath)) {
+      return false;
+    }
+    // In case of incremental _dumpmetadata may not be present, still we should resume.
+    if (previousValidHiveDumpPath != null) {

Review comment:
       if previous dump path was not deleted, it would not qualify as lastDumpPath. However, have dome some refactoring to make it look more clear.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
##########
@@ -76,6 +76,7 @@
   public static final String FILES_NAME = "_files";
   public static final String DATA_PATH_NAME = "data";
   public static final String METADATA_PATH_NAME = "metadata";
+  public static final String LAST_EVENT_ID_NAME = "LastEventID";

Review comment:
       Had that 'lastEventId' initially, but later was thinking, the dump type in _dumpmetadata is mentioned in all upper. This is actually capitalized which is not  even that. Can change to all upper or lastEventId or LastEventID.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org