You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/03/03 10:11:48 UTC

[GitHub] [hive] aasha opened a new pull request #932: HIVE-22954 Repl Load using scheduler

aasha opened a new pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388459135
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
 ##########
 @@ -466,29 +520,30 @@ private void initReplStatus(ASTNode ast) throws SemanticException{
 
   private void analyzeReplStatus(ASTNode ast) throws SemanticException {
     initReplStatus(ast);
-
     String dbNameOrPattern = replScope.getDbName();
-    String replLastId = null;
+    String replLastId = getReplStatus(dbNameOrPattern);
+    prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+    setFetchTask(createFetchTask("last_repl_id#string"));
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}",
 
 Review comment:
   yes printing conf in debug mode. The toString is already implemented in Configuration. I missed it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388220563
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
 ##########
 @@ -121,7 +121,9 @@ public String getName() {
   public int execute() {
     try {
       Hive hiveDb = getHive();
-      Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), work.dbNameOrPattern.toLowerCase());
+      Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+              Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
 
 Review comment:
   will still need repl dump *

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388220832
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
 ##########
 @@ -387,27 +388,80 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException {
       // At this point, all dump dirs should contain a _dumpmetadata file that
       // tells us what is inside that dumpdir.
 
-      DumpMetaData dmd = new DumpMetaData(loadPath, conf);
-
-      boolean evDump = false;
-      // we will decide what hdfs locations needs to be copied over here as well.
-      if (dmd.isIncrementalDump()) {
-        LOG.debug("{} contains an incremental dump", loadPath);
-        evDump = true;
+      //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done
+      if (isTargetAlreadyLoaded) {
+        return;
       } else {
-        LOG.debug("{} contains an bootstrap dump", loadPath);
+        DumpMetaData dmd = new DumpMetaData(loadPath, conf);
+
+        boolean evDump = false;
+        // we will decide what hdfs locations needs to be copied over here as well.
+        if (dmd.isIncrementalDump()) {
+          LOG.debug("{} contains an incremental dump", loadPath);
+          evDump = true;
+        } else {
+          LOG.debug("{} contains an bootstrap dump", loadPath);
+        }
+        ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
+                dmd.getReplScope(),
+                queryState.getLineageState(), evDump, dmd.getEventTo(),
+                dirLocationsToCopy(loadPath, evDump));
+        rootTasks.add(TaskFactory.get(replLoadWork, conf));
       }
-      ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
-              dmd.getReplScope(),
-              queryState.getLineageState(), evDump, dmd.getEventTo(),
-          dirLocationsToCopy(loadPath, evDump));
-      rootTasks.add(TaskFactory.get(replLoadWork, conf));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e.getMessage(), e);
     }
   }
 
+  private Path getCurrentLoadPath() throws IOException, SemanticException {
+    Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+            Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase()
+            .getBytes(StandardCharsets.UTF_8.name())));
+    final FileSystem fs = loadPathBase.getFileSystem(conf);
+
+    // Make fully qualified path for further use.
+    loadPathBase = fs.makeQualified(loadPathBase);
+
+    if (!fs.exists(loadPathBase)) {
+      // supposed dump path does not exist.
+      LOG.error("File not found " + loadPathBase.toUri().toString());
+      throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
+    }
+    FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase);
+    if (statuses.length > 0) {
+      //sort based on last modified. Recent one is at the end
+      Arrays.sort(statuses, new Comparator<FileStatus>() {
+        public int compare(FileStatus f1, FileStatus f2) {
+          return Long.compare(f1.getModificationTime(), f2.getModificationTime());
+        }
+      });
+      if (replScope.getDbName() != null) {
+        String currentReplStatusOfTarget
+                = getReplStatus(replScope.getDbName());
+        if (currentReplStatusOfTarget == null) { //bootstrap
+          return statuses[0].getPath();
+        } else {
+          DumpMetaData latestDump = new DumpMetaData(statuses[statuses.length - 1].getPath(), conf);
+          if (Long.parseLong(currentReplStatusOfTarget) >= latestDump.getEventTo().longValue()) {
+            isTargetAlreadyLoaded = true;
+          } else {
+            for (FileStatus status : statuses) {
+              DumpMetaData dmd = new DumpMetaData(status.getPath(), conf);
+              if (dmd.isIncrementalDump()
+                      && Long.parseLong(currentReplStatusOfTarget) < dmd.getEventTo().longValue()) {
 
 Review comment:
   There are test cases where incremental event starts from say (3-15) and current status of  target is at 10 because of the previous partial incremental load.
   Like TestReplicationWithTableMigration#testIncrementalLoadMigrationManagedToAcidFailurePart

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388373650
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
 ##########
 @@ -466,29 +520,30 @@ private void initReplStatus(ASTNode ast) throws SemanticException{
 
   private void analyzeReplStatus(ASTNode ast) throws SemanticException {
     initReplStatus(ast);
-
     String dbNameOrPattern = replScope.getDbName();
-    String replLastId = null;
+    String replLastId = getReplStatus(dbNameOrPattern);
+    prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+    setFetchTask(createFetchTask("last_repl_id#string"));
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}",
 
 Review comment:
   having files printed might not be useful, it might be good to have the full configs printed in debug or may be trace mode additionally.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388187987
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
 ##########
 @@ -466,29 +520,30 @@ private void initReplStatus(ASTNode ast) throws SemanticException{
 
   private void analyzeReplStatus(ASTNode ast) throws SemanticException {
     initReplStatus(ast);
-
     String dbNameOrPattern = replScope.getDbName();
-    String replLastId = null;
+    String replLastId = getReplStatus(dbNameOrPattern);
+    prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+    setFetchTask(createFetchTask("last_repl_id#string"));
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}",
 
 Review comment:
   do you want to print the full conf here ? if yes you might want to provide a String representation of it along with an additional {} placeholder in the message to put the values

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388208876
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
 ##########
 @@ -466,29 +520,30 @@ private void initReplStatus(ASTNode ast) throws SemanticException{
 
   private void analyzeReplStatus(ASTNode ast) throws SemanticException {
     initReplStatus(ast);
-
     String dbNameOrPattern = replScope.getDbName();
-    String replLastId = null;
+    String replLastId = getReplStatus(dbNameOrPattern);
+    prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+    setFetchTask(createFetchTask("last_repl_id#string"));
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}",
 
 Review comment:
   This conf just prints the configuration files and not all the configs. This was part of older repl status code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388185380
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
 ##########
 @@ -387,27 +388,80 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException {
       // At this point, all dump dirs should contain a _dumpmetadata file that
       // tells us what is inside that dumpdir.
 
-      DumpMetaData dmd = new DumpMetaData(loadPath, conf);
-
-      boolean evDump = false;
-      // we will decide what hdfs locations needs to be copied over here as well.
-      if (dmd.isIncrementalDump()) {
-        LOG.debug("{} contains an incremental dump", loadPath);
-        evDump = true;
+      //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done
+      if (isTargetAlreadyLoaded) {
+        return;
       } else {
-        LOG.debug("{} contains an bootstrap dump", loadPath);
+        DumpMetaData dmd = new DumpMetaData(loadPath, conf);
+
+        boolean evDump = false;
+        // we will decide what hdfs locations needs to be copied over here as well.
+        if (dmd.isIncrementalDump()) {
+          LOG.debug("{} contains an incremental dump", loadPath);
+          evDump = true;
+        } else {
+          LOG.debug("{} contains an bootstrap dump", loadPath);
+        }
+        ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
+                dmd.getReplScope(),
+                queryState.getLineageState(), evDump, dmd.getEventTo(),
+                dirLocationsToCopy(loadPath, evDump));
+        rootTasks.add(TaskFactory.get(replLoadWork, conf));
       }
-      ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
-              dmd.getReplScope(),
-              queryState.getLineageState(), evDump, dmd.getEventTo(),
-          dirLocationsToCopy(loadPath, evDump));
-      rootTasks.add(TaskFactory.get(replLoadWork, conf));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e.getMessage(), e);
     }
   }
 
+  private Path getCurrentLoadPath() throws IOException, SemanticException {
+    Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+            Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase()
+            .getBytes(StandardCharsets.UTF_8.name())));
+    final FileSystem fs = loadPathBase.getFileSystem(conf);
+
+    // Make fully qualified path for further use.
+    loadPathBase = fs.makeQualified(loadPathBase);
+
+    if (!fs.exists(loadPathBase)) {
+      // supposed dump path does not exist.
+      LOG.error("File not found " + loadPathBase.toUri().toString());
+      throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
+    }
+    FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase);
+    if (statuses.length > 0) {
+      //sort based on last modified. Recent one is at the end
+      Arrays.sort(statuses, new Comparator<FileStatus>() {
+        public int compare(FileStatus f1, FileStatus f2) {
+          return Long.compare(f1.getModificationTime(), f2.getModificationTime());
+        }
+      });
+      if (replScope.getDbName() != null) {
+        String currentReplStatusOfTarget
+                = getReplStatus(replScope.getDbName());
+        if (currentReplStatusOfTarget == null) { //bootstrap
+          return statuses[0].getPath();
+        } else {
+          DumpMetaData latestDump = new DumpMetaData(statuses[statuses.length - 1].getPath(), conf);
+          if (Long.parseLong(currentReplStatusOfTarget) >= latestDump.getEventTo().longValue()) {
+            isTargetAlreadyLoaded = true;
+          } else {
+            for (FileStatus status : statuses) {
+              DumpMetaData dmd = new DumpMetaData(status.getPath(), conf);
+              if (dmd.isIncrementalDump()
+                      && Long.parseLong(currentReplStatusOfTarget) < dmd.getEventTo().longValue()) {
 
 Review comment:
   .longValue() function is not required.
   
   also why not do comparison with EventFrom will be easy to reason about in code. since the repl status should be either 1 less than from or equal to from event.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388186231
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
 ##########
 @@ -387,27 +388,80 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException {
       // At this point, all dump dirs should contain a _dumpmetadata file that
       // tells us what is inside that dumpdir.
 
-      DumpMetaData dmd = new DumpMetaData(loadPath, conf);
-
-      boolean evDump = false;
-      // we will decide what hdfs locations needs to be copied over here as well.
-      if (dmd.isIncrementalDump()) {
-        LOG.debug("{} contains an incremental dump", loadPath);
-        evDump = true;
+      //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done
+      if (isTargetAlreadyLoaded) {
+        return;
       } else {
-        LOG.debug("{} contains an bootstrap dump", loadPath);
+        DumpMetaData dmd = new DumpMetaData(loadPath, conf);
+
+        boolean evDump = false;
+        // we will decide what hdfs locations needs to be copied over here as well.
+        if (dmd.isIncrementalDump()) {
+          LOG.debug("{} contains an incremental dump", loadPath);
+          evDump = true;
+        } else {
+          LOG.debug("{} contains an bootstrap dump", loadPath);
+        }
+        ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
+                dmd.getReplScope(),
+                queryState.getLineageState(), evDump, dmd.getEventTo(),
+                dirLocationsToCopy(loadPath, evDump));
+        rootTasks.add(TaskFactory.get(replLoadWork, conf));
       }
-      ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
-              dmd.getReplScope(),
-              queryState.getLineageState(), evDump, dmd.getEventTo(),
-          dirLocationsToCopy(loadPath, evDump));
-      rootTasks.add(TaskFactory.get(replLoadWork, conf));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e.getMessage(), e);
     }
   }
 
+  private Path getCurrentLoadPath() throws IOException, SemanticException {
+    Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+            Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase()
+            .getBytes(StandardCharsets.UTF_8.name())));
+    final FileSystem fs = loadPathBase.getFileSystem(conf);
+
+    // Make fully qualified path for further use.
+    loadPathBase = fs.makeQualified(loadPathBase);
+
+    if (!fs.exists(loadPathBase)) {
+      // supposed dump path does not exist.
+      LOG.error("File not found " + loadPathBase.toUri().toString());
+      throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
+    }
+    FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase);
+    if (statuses.length > 0) {
+      //sort based on last modified. Recent one is at the end
+      Arrays.sort(statuses, new Comparator<FileStatus>() {
+        public int compare(FileStatus f1, FileStatus f2) {
+          return Long.compare(f1.getModificationTime(), f2.getModificationTime());
+        }
+      });
+      if (replScope.getDbName() != null) {
+        String currentReplStatusOfTarget
+                = getReplStatus(replScope.getDbName());
+        if (currentReplStatusOfTarget == null) { //bootstrap
+          return statuses[0].getPath();
+        } else {
+          DumpMetaData latestDump = new DumpMetaData(statuses[statuses.length - 1].getPath(), conf);
+          if (Long.parseLong(currentReplStatusOfTarget) >= latestDump.getEventTo().longValue()) {
+            isTargetAlreadyLoaded = true;
+          } else {
+            for (FileStatus status : statuses) {
+              DumpMetaData dmd = new DumpMetaData(status.getPath(), conf);
+              if (dmd.isIncrementalDump()
+                      && Long.parseLong(currentReplStatusOfTarget) < dmd.getEventTo().longValue()) {
+                return status.getPath();
+              }
+            }
+          }
+        }
+      }
+      //If dbname is null(in case of repl load *), can't get repl status of target, return the latest dump
+      return statuses[statuses.length - 1].getPath();
 
 Review comment:
   this use case is not supported, would be better throw UnsupportedOperationException here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388174308
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
 ##########
 @@ -121,7 +121,9 @@ public String getName() {
   public int execute() {
     try {
       Hive hiveDb = getHive();
-      Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), work.dbNameOrPattern.toLowerCase());
+      Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+              Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
 
 Review comment:
   Given that we are not going to have expression ( for ex for table level replication ) do we still need the base64 encoding ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388191499
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
 ##########
 @@ -387,27 +388,80 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException {
       // At this point, all dump dirs should contain a _dumpmetadata file that
       // tells us what is inside that dumpdir.
 
-      DumpMetaData dmd = new DumpMetaData(loadPath, conf);
-
-      boolean evDump = false;
-      // we will decide what hdfs locations needs to be copied over here as well.
-      if (dmd.isIncrementalDump()) {
-        LOG.debug("{} contains an incremental dump", loadPath);
-        evDump = true;
+      //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done
+      if (isTargetAlreadyLoaded) {
 
 Review comment:
   may be do a !isTargetAlreadyLoaded and have just one conditional {}

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler

Posted by GitBox <gi...@apache.org>.
anishek commented on a change in pull request #932: HIVE-22954 Repl Load using scheduler
URL: https://github.com/apache/hive/pull/932#discussion_r388181887
 
 

 ##########
 File path: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
 ##########
 @@ -387,27 +388,80 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException {
       // At this point, all dump dirs should contain a _dumpmetadata file that
       // tells us what is inside that dumpdir.
 
-      DumpMetaData dmd = new DumpMetaData(loadPath, conf);
-
-      boolean evDump = false;
-      // we will decide what hdfs locations needs to be copied over here as well.
-      if (dmd.isIncrementalDump()) {
-        LOG.debug("{} contains an incremental dump", loadPath);
-        evDump = true;
+      //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done
+      if (isTargetAlreadyLoaded) {
+        return;
       } else {
-        LOG.debug("{} contains an bootstrap dump", loadPath);
+        DumpMetaData dmd = new DumpMetaData(loadPath, conf);
+
+        boolean evDump = false;
+        // we will decide what hdfs locations needs to be copied over here as well.
+        if (dmd.isIncrementalDump()) {
+          LOG.debug("{} contains an incremental dump", loadPath);
+          evDump = true;
+        } else {
+          LOG.debug("{} contains an bootstrap dump", loadPath);
+        }
+        ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
+                dmd.getReplScope(),
+                queryState.getLineageState(), evDump, dmd.getEventTo(),
+                dirLocationsToCopy(loadPath, evDump));
+        rootTasks.add(TaskFactory.get(replLoadWork, conf));
       }
-      ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
-              dmd.getReplScope(),
-              queryState.getLineageState(), evDump, dmd.getEventTo(),
-          dirLocationsToCopy(loadPath, evDump));
-      rootTasks.add(TaskFactory.get(replLoadWork, conf));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e.getMessage(), e);
     }
   }
 
+  private Path getCurrentLoadPath() throws IOException, SemanticException {
+    Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+            Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase()
+            .getBytes(StandardCharsets.UTF_8.name())));
+    final FileSystem fs = loadPathBase.getFileSystem(conf);
+
+    // Make fully qualified path for further use.
+    loadPathBase = fs.makeQualified(loadPathBase);
+
+    if (!fs.exists(loadPathBase)) {
+      // supposed dump path does not exist.
+      LOG.error("File not found " + loadPathBase.toUri().toString());
+      throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
+    }
+    FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase);
+    if (statuses.length > 0) {
+      //sort based on last modified. Recent one is at the end
+      Arrays.sort(statuses, new Comparator<FileStatus>() {
+        public int compare(FileStatus f1, FileStatus f2) {
+          return Long.compare(f1.getModificationTime(), f2.getModificationTime());
+        }
+      });
+      if (replScope.getDbName() != null) {
+        String currentReplStatusOfTarget
+                = getReplStatus(replScope.getDbName());
+        if (currentReplStatusOfTarget == null) { //bootstrap
+          return statuses[0].getPath();
+        } else {
+          DumpMetaData latestDump = new DumpMetaData(statuses[statuses.length - 1].getPath(), conf);
+          if (Long.parseLong(currentReplStatusOfTarget) >= latestDump.getEventTo().longValue()) {
 
 Review comment:
   you might want to do a trim here of currentReplStatusOfTarget, dont think the parser for long does it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org