You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/07/10 08:17:35 UTC

[GitHub] [hive] aasha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

aasha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r451668879



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {

Review comment:
       this check is added to different methods

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
##########
@@ -76,16 +77,29 @@ public void handle(Context withinContext) throws Exception {
         withinContext.hiveConf);
     Iterable<String> files = eventMessage.getFiles();
 
+    boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+
     /*
       * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple tables.
       * But, Insert event is generated for each partition to which the data is inserted.
       * So, qlPtns list will have only one entry.
      */
     Partition ptn = (null == qlPtns || qlPtns.isEmpty()) ? null : qlPtns.get(0);
     if (files != null) {
-      // encoded filename/checksum of files, write into _files
-      for (String file : files) {
-        writeFileEntry(qlMdTable, ptn, file, withinContext);
+      if (copyAtLoad) {
+        // encoded filename/checksum of files, write into _files
+        Path dataPath = null;
+        if ((null == qlPtns) || qlPtns.isEmpty()) {
+          dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+        } else {
+          dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator

Review comment:
       Use path constructor instead of appending with File separator

##########
File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
##########
@@ -148,6 +148,13 @@ public static synchronized ReplChangeManager getInstance(Configuration conf)
     return instance;
   }
 
+  public static synchronized ReplChangeManager getInstance() {

Review comment:
       why do you need this?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {

Review comment:
       use existing retry interface

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {
+      // This is only called for replication that handles MM tables; no need for mmCtx.
+      try (BufferedWriter writer = writer()) {
+        for (Path dataPath : dataPathList) {
+          writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+        }
+        done = true;
+      } catch (IOException e) {
+        if (e instanceof FileNotFoundException) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
+        }
+        repeat++;
+        logger.info("writeFilesList failed", e);
+        if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+        }
+
+        int sleepTime = FileUtils.getSleepTime(repeat - 1);
+        logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat);
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException timerEx) {
+          logger.info("thread sleep interrupted", timerEx.getMessage());
+        }
+
+        // in case of io error, reset the file system object
+        FileSystem.closeAllForUGI(Utils.getUGI());
+        dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
+        exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
+        Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+        if (exportFileSystem.exists(exportPath)) {
+          exportFileSystem.delete(exportPath, true);
+        }
+      }
+    }
+  }
+
+  private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
+          throws IOException {
+    ReplChangeManager replChangeManager = ReplChangeManager.getInstance();

Review comment:
       Needn't pass this as param. Can be accessed directly by encodeUri method

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);

Review comment:
       what happens to checkpointning in case of the cache? If it fails in between, is the cache rebuilt?

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -524,6 +524,11 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
             + "task increment that would cross the specified limit."),
     REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",100,
         "Number of threads that will be used to dump partition data information during repl dump."),
+    REPL_DATA_COPY_LAZY("hive.repl.data.copy.lazy", false,

Review comment:
       what happens if this config is set at dump time but not at load time. Please add validations

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);
+    boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    FileList managedTblList = dataCopyAtLoad ? null : createTableFileList(dumpRoot, EximUtil.FILES_NAME, cacheSize);
+    FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILES_NAME_EXTERNAL, cacheSize);

Review comment:
       this check is only for managed tables?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);
+    boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    FileList managedTblList = dataCopyAtLoad ? null : createTableFileList(dumpRoot, EximUtil.FILES_NAME, cacheSize);
+    FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILES_NAME_EXTERNAL, cacheSize);

Review comment:
       If the external table base path is on source, where will the distcp job run to copy data there? Will it be on source or target?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -330,6 +333,20 @@ a database ( directory )
     return 0;
   }
 
+  private void addLazyDataCopyTask(TaskTracker loadTaskTracker) {

Review comment:
       will this data copy be after metadata copy or before?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+public interface StringConvertibleObject {

Review comment:
       Do you need this? Can be utility methods

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
##########
@@ -81,10 +88,13 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met
             withinContext.hiveConf);
 
     if ((null == qlPtns) || qlPtns.isEmpty()) {
-      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext);
+      Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext, dataPath);
     } else {
       for (int idx = 0; idx < qlPtns.size(); idx++) {
-        writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext);
+        Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator

Review comment:
       nit : use path constructor instead of doing a append

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
##########
@@ -51,4 +56,20 @@ public Path getFullyQualifiedSourcePath() {
   public Path getFullyQualifiedTargetPath() {
     return fullyQualifiedTargetPath;
   }
+
+  @Override
+  public String convertToString() {
+    StringBuilder objInStr = new StringBuilder();
+    objInStr.append(fullyQualifiedSourcePath)
+            .append(URI_SEPARATOR)
+            .append(fullyQualifiedTargetPath);
+    return objInStr.toString();
+  }
+
+  @Override
+  public void loadFromString(String objectInStr) {

Review comment:
       The cm path is not needed here?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);

Review comment:
       Or after writing to file if it fails, do we rewrite?

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
##########
@@ -131,16 +131,15 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw
       private int tableDumpCount = 0;
 
       @Override
-      List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList,
-                                                    Path dbRootMetadata, Path dbRootData,
-                                               long lastReplId, Hive hiveDb,
-                                               HiveWrapper.Tuple<Table> tuple)
+      void dumpTable(String dbName, String tblName, String validTxnList,
+                     Path dbRootMetadata, Path dbRootData,
+                     long lastReplId, Hive hiveDb,
+                     HiveWrapper.Tuple<Table> tuple, FileList managedTableDirFileList, boolean dataCopyAtLoad)

Review comment:
       unused params. Add tests

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean stop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private volatile boolean asyncMode = false;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+    init();
+  }
+
+  private void init() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode)));

Review comment:
       rename asyncmode to overwrite. init is called at the constructor and Its set to false always there

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {
+      // This is only called for replication that handles MM tables; no need for mmCtx.
+      try (BufferedWriter writer = writer()) {
+        for (Path dataPath : dataPathList) {
+          writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+        }
+        done = true;
+      } catch (IOException e) {
+        if (e instanceof FileNotFoundException) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
+        }
+        repeat++;
+        logger.info("writeFilesList failed", e);
+        if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+        }
+
+        int sleepTime = FileUtils.getSleepTime(repeat - 1);
+        logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat);
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException timerEx) {
+          logger.info("thread sleep interrupted", timerEx.getMessage());
+        }
+
+        // in case of io error, reset the file system object
+        FileSystem.closeAllForUGI(Utils.getUGI());
+        dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
+        exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
+        Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+        if (exportFileSystem.exists(exportPath)) {
+          exportFileSystem.delete(exportPath, true);
+        }
+      }
+    }
+  }
+
+  private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
+          throws IOException {
+    ReplChangeManager replChangeManager = ReplChangeManager.getInstance();
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        // Write files inside the sub-directory.
+        Path subDir = fileStatus.getPath();
+        writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir));
+      } else {
+        writer.write(encodedUri(replChangeManager, fileStatus, encodedSubDirs));
+        writer.newLine();
+      }
+    }
+  }
+
+  private BufferedWriter writer() throws IOException {
+    Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+    logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile);
+    return new BufferedWriter(
+            new OutputStreamWriter(exportFileSystem.create(exportToFile))
+    );
+  }
+
+  private String encodedSubDir(String encodedParentDirs, Path subDir) {
+    if (null == encodedParentDirs) {
+      return subDir.getName();
+    } else {
+      return encodedParentDirs + Path.SEPARATOR + subDir.getName();
+    }
+  }
+
+  private String encodedUri(ReplChangeManager replChangeManager, FileStatus fileStatus, String encodedSubDir)
+          throws IOException {
+    Path currentDataFilePath = fileStatus.getPath();
+    String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);

Review comment:
       Does this method need to be static

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;

Review comment:
       why do you need a thresholdPoint and thresholdHit?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org