You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/03/01 04:58:18 UTC

[GitHub] [hive] pkumarsinha commented on a change in pull request #2005: HIVE-24783: Store currentNotificationID on target during repl load op.

pkumarsinha commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r584438654



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -162,6 +164,37 @@ public void testAcidTablesBootstrap() throws Throwable {
     verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
   }
 
+  @Test
+  public void testNotificationFromLoadMetadataAck() throws Throwable{
+    long previousLoadNotificationID = 0, currentLoadNotificationID, currentNotificationID;
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .verifyResults(new String[] {});
+    currentLoadNotificationID = fetchNotificationIDFromDump(new Path(bootstrapDump.dumpLocation));
+    currentNotificationID = replica.getCurrentNotificationEventId().getEventId();
+    assertTrue(currentLoadNotificationID > previousLoadNotificationID && currentNotificationID > currentLoadNotificationID);
+    previousLoadNotificationID = currentLoadNotificationID;
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("insert into t1 values (1)")
+            .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .verifyResults(new String[] {});
+    currentLoadNotificationID = fetchNotificationIDFromDump(new Path(incrementalDump1.dumpLocation));
+    currentNotificationID = replica.getCurrentNotificationEventId().getEventId();
+    assertTrue(currentLoadNotificationID > previousLoadNotificationID && currentNotificationID > currentLoadNotificationID);
+  }
+
+  private long fetchNotificationIDFromDump(Path dumpLocation) throws Exception{
+    Path loadMetadataFilePath = new Path(dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR + "/" + ReplAck.LOAD_METADATA);

Review comment:
       Use File.separator in stead of /

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
##########
@@ -43,6 +43,9 @@
   @Override
   public int execute() {
     try {
+      for( preAckTask task : work.getPreAckTasks() ){

Review comment:
       Format line:   for( preAckTask task : work.getPreAckTasks() ){
          => for (preAckTask task : work.getPreAckTasks()) { ?
    

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+            long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, LOAD_METADATA.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : {}", notificationFilePath, currentNotificationID);
+          }catch (Exception e) {

Review comment:
       nit: format the code, it would fix the check-style issues like the above

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -664,3 +678,7 @@ private int executeIncrementalLoad() throws Exception {
     return 0;
   }
 }
+
+interface preAckTask{

Review comment:
       Move this to AckTask, Fix interface name

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,22 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if ( !work.hasBootstrapLoadTasks() &&
+            ( work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true ) ) {
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
+      List<preAckTask> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new preAckTask() {
+        @Override
+        public void run() throws Exception {

Review comment:
       Does it result in non-recoverable error?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,22 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if ( !work.hasBootstrapLoadTasks() &&

Review comment:
       nit: format the line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org