You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/02/23 05:57:46 UTC

[GitHub] [hive] hmangla98 opened a new pull request #2005: HIVE-24783: Store currentNotificationID on target during repl load op.

hmangla98 opened a new pull request #2005:
URL: https://github.com/apache/hive/pull/2005


   What changes were proposed in this pull request?
   Storing last notificationID of target cluster while executing repl load command.
   
   Why are the changes needed?
   Does this PR introduce any user-facing change?
   No
   
   How was this patch tested?
   Added one test => TestReplicationScenariosAcidTables#testAcidNotification


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #2005: HIVE-24783: Store currentNotificationID on target during repl load op.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r583218805



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+            long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, LOAD_METADATA.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : {}", notificationFilePath, currentNotificationID);

Review comment:
       NotificationACK file  -> Load metadata file

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +504,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+            long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, NOTIFICATION_FILE.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : {}", notificationFilePath, currentNotificationID);
+          }catch (Exception e) {
+            e.printStackTrace();

Review comment:
       Use a varaint of RuntimeException and throw it back

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
##########
@@ -414,6 +418,24 @@ WarehouseInstance verifyResults(List data) throws IOException {
     return this;
   }
 
+  long verifyNotificationAck(String dumpLocation, long prevNotificationID) throws Exception {

Review comment:
       Move it to TestReplicationScenariosAcidTables

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){

Review comment:
       Few lines like this are crossing the  default max length(120 I thinl) for check-style. You may want to format them

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
##########
@@ -414,6 +418,24 @@ WarehouseInstance verifyResults(List data) throws IOException {
     return this;
   }
 
+  long verifyNotificationAck(String dumpLocation, long prevNotificationID) throws Exception {
+    FileSystem fs = new Path(dumpLocation).getFileSystem(hiveConf);
+    Path notificationAckFile = new Path(dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR + "/" + ReplAck.NOTIFICATION_FILE);
+    assertTrue(fs.exists(notificationAckFile));
+    long currentNotificationID = getCurrentNotificationEventId().getEventId();
+    long previousLoadNotificationID = fetchNotificationIDFromDump(notificationAckFile, fs);
+    assertTrue(previousLoadNotificationID > prevNotificationID && currentNotificationID > previousLoadNotificationID);
+    return previousLoadNotificationID;
+  }
+
+  long fetchNotificationIDFromDump(Path notificationAckFile, FileSystem fs) throws Exception{
+    InputStream inputstream = fs.open(notificationAckFile);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inputstream));
+    String line = reader.readLine();
+    assertTrue(line!=null && reader.readLine()==null);
+    return Long.parseLong(line);

Review comment:
       close the reader/stream

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+            long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, LOAD_METADATA.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : {}", notificationFilePath, currentNotificationID);
+          }catch (Exception e) {
+            throw new RuntimeException(e);

Review comment:
       Does it become non-recoverable error or recoverable error?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -51,8 +51,12 @@
 
 import javax.annotation.Nullable;
 
+import java.io.Closeable;

Review comment:
       Remove unused import

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java
##########
@@ -35,6 +36,7 @@
   private static final long serialVersionUID = 1L;
   private Path ackFilePath;
   private transient ReplicationMetricCollector metricCollector;
+  private List<Runnable> tasks;

Review comment:
       nit: How about renaming to preAckTasks to avoid confusion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #2005: HIVE-24783: Store currentNotificationID on target during repl load op.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r580814932



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -499,6 +500,20 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
         dbName);
   }
 
+  private void createNotificationFileTask() {
+    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())

Review comment:
       Can simplify the condition:
   !work.hasBootstrapLoadTasks() is required condition on both side of ||

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -162,6 +162,23 @@ public void testAcidTablesBootstrap() throws Throwable {
     verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
   }
 
+  @Test
+  public void testAcidNotification() throws Throwable{
+    // Bootstrap
+    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
+    replica.load(replicatedDbName, primaryDbName);
+    verifyBootLoadNotification(replicatedDbName, bootstrapDump.lastReplicationId, bootstrapDump.dumpLocation, true);
+

Review comment:
       nit:Remove this extra line

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/NotificationFileTask.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+
+public class NotificationFileTask extends Task<NotificationFileWork> implements Serializable {

Review comment:
       Did you explore if we can do it in a LoadAckTask itself?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -352,6 +352,7 @@ a database ( directory )
     context.getFsScratchDirs().putAll(loadContext.pathInfo.getFsScratchDirs());
     if (!HiveConf.getBoolVar(conf, REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
       createReplLoadCompleteAckTask();
+      createNotificationFileTask();

Review comment:
       Shouldn't ack creation be the last thing?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] hmangla98 commented on a change in pull request #2005: HIVE-24783: Store currentNotificationID on target during repl load op.

Posted by GitBox <gi...@apache.org>.
hmangla98 commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r580820651



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/NotificationFileTask.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+
+public class NotificationFileTask extends Task<NotificationFileWork> implements Serializable {

Review comment:
       Yes, we can do this in that task also, but i preferred keeping ACK task separate from other tasks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] hmangla98 commented on a change in pull request #2005: HIVE-24783: Store currentNotificationID on target during repl load op.

Posted by GitBox <gi...@apache.org>.
hmangla98 commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r580819308



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -352,6 +352,7 @@ a database ( directory )
     context.getFsScratchDirs().putAll(loadContext.pathInfo.getFsScratchDirs());
     if (!HiveConf.getBoolVar(conf, REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
       createReplLoadCompleteAckTask();
+      createNotificationFileTask();

Review comment:
       Yes, It should be. I'll change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #2005: HIVE-24783: Store currentNotificationID on target during repl load op.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r581672838



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -499,6 +500,20 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
         dbName);
   }
 
+  private void createNotificationFileTask() {
+    if (((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() )
+            || (!work.isIncrementalLoad()))  && !work.hasBootstrapLoadTasks()) {
+      //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
+      NotificationFileWork notification_Work = new NotificationFileWork(new Path(work.dumpDirectory, "_notification_id"), work.getMetricCollector());

Review comment:
       File name should be a constant defined

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/NotificationFileTask.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+
+public class NotificationFileTask extends Task<NotificationFileWork> implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private Logger LOG = LoggerFactory.getLogger(AckTask.class);
+
+    @Override
+    public int execute() {
+        try {
+            HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+            long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId();

Review comment:
       Check if this is retry-able call

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
##########
@@ -263,6 +266,21 @@ WarehouseInstance runFailure(String command, int errorCode) throws Throwable {
     }
   }
 
+  WarehouseInstance verifyNotificationID(String dumpLocation) throws Exception {
+    FileSystem fs = new Path(dumpLocation).getFileSystem(hiveConf);
+    long currentNotificationID = getCurrentNotificationEventId().getEventId();
+    Path notifyFilePath = new Path(dumpLocation + "/hive/", "_notification_id");
+    assertTrue(fs.exists(notifyFilePath));
+    InputStream inputStream = fs.open(notifyFilePath);
+
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+    for (String line = reader.readLine(); line != null; line = reader.readLine()) {

Review comment:
       if line == null, assertion will get skipped. We should be sure about the value being present.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
##########
@@ -224,6 +224,38 @@ void verifyLoadExecution(String replicatedDbName, String lastReplId, boolean inc
     }
   }
 
+  void verifyBootLoadNotification(String replicatedDbName, String lastReplId, String dumpLocation, boolean includeAcid)
+          throws Throwable {
+    List<String> tableNames = new LinkedList<>(nonAcidTableNames);
+    if (includeAcid) {

Review comment:
       We do not need this distinction? 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
##########
@@ -263,6 +266,21 @@ WarehouseInstance runFailure(String command, int errorCode) throws Throwable {
     }
   }
 
+  WarehouseInstance verifyNotificationID(String dumpLocation) throws Exception {
+    FileSystem fs = new Path(dumpLocation).getFileSystem(hiveConf);
+    long currentNotificationID = getCurrentNotificationEventId().getEventId();
+    Path notifyFilePath = new Path(dumpLocation + "/hive/", "_notification_id");

Review comment:
       Use constant for file name, hive dir. Prefer File.separator whenever possible

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/NotificationFileTask.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+
+public class NotificationFileTask extends Task<NotificationFileWork> implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private Logger LOG = LoggerFactory.getLogger(AckTask.class);
+
+    @Override
+    public int execute() {
+        try {
+            HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+            long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationPath = work.getNotificationFilePath();
+            Utils.writeOutput(String.valueOf(currentNotificationID), notificationPath, conf);
+            LOG.info("Created Notification file : {} ", notificationPath);

Review comment:
       Add currentNotificationID also as a part of log statement.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
##########
@@ -263,6 +266,21 @@ WarehouseInstance runFailure(String command, int errorCode) throws Throwable {
     }
   }
 
+  WarehouseInstance verifyNotificationID(String dumpLocation) throws Exception {
+    FileSystem fs = new Path(dumpLocation).getFileSystem(hiveConf);
+    long currentNotificationID = getCurrentNotificationEventId().getEventId();
+    Path notifyFilePath = new Path(dumpLocation + "/hive/", "_notification_id");
+    assertTrue(fs.exists(notifyFilePath));
+    InputStream inputStream = fs.open(notifyFilePath);
+
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+    for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+      assertTrue(currentNotificationID > Long.parseLong(line));

Review comment:
       This may pass for 0 as well. What we need to make sure is that we are making progress on each iteration of dump-load.
   Store the currently stored notification id in a variable. And perform an additional assertion that the current one is greater than previously stored one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #2005: HIVE-24783: Store currentNotificationID on target during repl load op.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r584438654



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -162,6 +164,37 @@ public void testAcidTablesBootstrap() throws Throwable {
     verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
   }
 
+  @Test
+  public void testNotificationFromLoadMetadataAck() throws Throwable{
+    long previousLoadNotificationID = 0, currentLoadNotificationID, currentNotificationID;
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .verifyResults(new String[] {});
+    currentLoadNotificationID = fetchNotificationIDFromDump(new Path(bootstrapDump.dumpLocation));
+    currentNotificationID = replica.getCurrentNotificationEventId().getEventId();
+    assertTrue(currentLoadNotificationID > previousLoadNotificationID && currentNotificationID > currentLoadNotificationID);
+    previousLoadNotificationID = currentLoadNotificationID;
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("insert into t1 values (1)")
+            .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .verifyResults(new String[] {});
+    currentLoadNotificationID = fetchNotificationIDFromDump(new Path(incrementalDump1.dumpLocation));
+    currentNotificationID = replica.getCurrentNotificationEventId().getEventId();
+    assertTrue(currentLoadNotificationID > previousLoadNotificationID && currentNotificationID > currentLoadNotificationID);
+  }
+
+  private long fetchNotificationIDFromDump(Path dumpLocation) throws Exception{
+    Path loadMetadataFilePath = new Path(dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR + "/" + ReplAck.LOAD_METADATA);

Review comment:
       Use File.separator in stead of /

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
##########
@@ -43,6 +43,9 @@
   @Override
   public int execute() {
     try {
+      for( preAckTask task : work.getPreAckTasks() ){

Review comment:
       Format line:   for( preAckTask task : work.getPreAckTasks() ){
          => for (preAckTask task : work.getPreAckTasks()) { ?
    

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+            long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, LOAD_METADATA.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : {}", notificationFilePath, currentNotificationID);
+          }catch (Exception e) {

Review comment:
       nit: format the code, it would fix the check-style issues like the above

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -664,3 +678,7 @@ private int executeIncrementalLoad() throws Exception {
     return 0;
   }
 }
+
+interface preAckTask{

Review comment:
       Move this to AckTask, Fix interface name

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,22 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if ( !work.hasBootstrapLoadTasks() &&
+            ( work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true ) ) {
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
+      List<preAckTask> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new preAckTask() {
+        @Override
+        public void run() throws Exception {

Review comment:
       Does it result in non-recoverable error?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,22 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if ( !work.hasBootstrapLoadTasks() &&

Review comment:
       nit: format the line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha merged pull request #2005: HIVE-24783: Store currentNotificationID on target during repl load op.

Posted by GitBox <gi...@apache.org>.
pkumarsinha merged pull request #2005:
URL: https://github.com/apache/hive/pull/2005


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org