You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/02/22 06:34:00 UTC

[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r579994523



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
##########
@@ -75,7 +77,9 @@ public String convertToString() {
     StringBuilder objInStr = new StringBuilder();
     objInStr.append(fullyQualifiedSourcePath)
             .append(URI_SEPARATOR)
-            .append(fullyQualifiedTargetPath);
+            .append(fullyQualifiedTargetPath)
+            .append(URI_SEPARATOR)
+            .append(tableName);

Review comment:
       Any specific reason for moving the tableName to the end?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa
   /*
    * Method used from TestReplicationScenariosExclusiveReplica
    */
-  private void assertExternalFileInfo(List<String> expected, String dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List<String> expected, String dumplocation,
                                       WarehouseInstance warehouseInstance)
           throws IOException {
     Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-    Path externalTableInfoFile;
-    if (isIncremental) {
-      externalTableInfoFile = new Path(hivePath, FILE_NAME);
-    } else {
-      externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-    }
-    ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile);
+    Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
       Given that there is a fixed location for FILE_LIST_EXTERNAL in both incremental and bootstrap now. Do you really need this method any more?
   TestReplicationScenariosAcrossInstances.assertExternalFileList ?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##########
@@ -103,9 +102,9 @@ public void replicationWithoutExternalTables() throws Throwable {
         .run("insert into table t2 partition(country='france') values ('paris')")
         .dump(primaryDbName, dumpWithClause);
 
-    // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
-    assertFalse(primary.miniDFSCluster.getFileSystem()
-        .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
+    // the _file_list_external only should be created if external tables are to be replicated not otherwise
+    assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation,

Review comment:
       Given that location for FILE_LIST_EXTERNAL is fixed for both bootstrap and incremental now. There are many methods not required anymore e.g. assertExternalFileList. Can you please check from this perspective and remove them off by doing a bit of refactoring.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {

Review comment:
       While one thread is about to do a retry, are you allowing a new thread to write? And if so, does a failure in one result in failure in other FileList.add call as well?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();
+            LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+          } catch (IOException e) {
+            LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+            this.retryMode = true;
+            close();
+            throw e;
+          }
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
     }
+  }
+
+  boolean shouldAppend() throws IOException {

Review comment:
       private?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();
+            LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+          } catch (IOException e) {
+            LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+            this.retryMode = true;
+            close();
+            throw e;
+          }
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
     }
+  }
+
+  boolean shouldAppend() throws IOException {
+    return backingFile.getFileSystem(conf).exists(backingFile) && this.retryMode;

Review comment:
       When do you expect backingFile.getFileSystem(conf).exists(backingFile) as true but this.retryMode is false and what does it mean?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -184,42 +190,86 @@ public void setResultValues(List<String> resultValues) {
     this.resultValues = resultValues;
   }
 
-  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) throws IOException {
     if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
       return Collections.emptyList();
     }
     List<Task<?>> tasks = new ArrayList<>();
-    while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
-      DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString());
-      dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
-      Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
-      tasks.add(task);
-      tracker.addTask(task);
-      LOG.debug("added task for {}", dirCopyWork);
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(UncheckedIOException.class).build();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        try{
+          int numEntriesToSkip = tasks == null ? 0 : tasks.size();
+          while (externalTblCopyPathIterator.hasNext() &&  tracker.canAddMoreTasks()) {
+            if(numEntriesToSkip > 0) {
+              //skip tasks added in previous attempts of this retryable block
+              externalTblCopyPathIterator.next();
+              numEntriesToSkip--;
+              continue;
+            }
+            DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString());
+            dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
+            Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
+            tasks.add(task);
+            tracker.addTask(task);

Review comment:
       What happens to tasks added prior to retrty




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org