You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/07/10 12:22:07 UTC

[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452808706



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {
+      // This is only called for replication that handles MM tables; no need for mmCtx.
+      try (BufferedWriter writer = writer()) {
+        for (Path dataPath : dataPathList) {
+          writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+        }
+        done = true;
+      } catch (IOException e) {
+        if (e instanceof FileNotFoundException) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
+        }
+        repeat++;
+        logger.info("writeFilesList failed", e);
+        if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+        }
+
+        int sleepTime = FileUtils.getSleepTime(repeat - 1);
+        logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat);
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException timerEx) {
+          logger.info("thread sleep interrupted", timerEx.getMessage());
+        }
+
+        // in case of io error, reset the file system object
+        FileSystem.closeAllForUGI(Utils.getUGI());
+        dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
+        exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
+        Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+        if (exportFileSystem.exists(exportPath)) {
+          exportFileSystem.delete(exportPath, true);
+        }
+      }
+    }
+  }
+
+  private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
+          throws IOException {
+    ReplChangeManager replChangeManager = ReplChangeManager.getInstance();
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        // Write files inside the sub-directory.
+        Path subDir = fileStatus.getPath();
+        writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir));
+      } else {
+        writer.write(encodedUri(replChangeManager, fileStatus, encodedSubDirs));
+        writer.newLine();
+      }
+    }
+  }
+
+  private BufferedWriter writer() throws IOException {
+    Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+    logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile);
+    return new BufferedWriter(
+            new OutputStreamWriter(exportFileSystem.create(exportToFile))
+    );
+  }
+
+  private String encodedSubDir(String encodedParentDirs, Path subDir) {
+    if (null == encodedParentDirs) {
+      return subDir.getName();
+    } else {
+      return encodedParentDirs + Path.SEPARATOR + subDir.getName();
+    }
+  }
+
+  private String encodedUri(ReplChangeManager replChangeManager, FileStatus fileStatus, String encodedSubDir)
+          throws IOException {
+    Path currentDataFilePath = fileStatus.getPath();
+    String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);

Review comment:
       Which method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org