You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/02/10 05:02:15 UTC

[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573450950



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {
+          if (backingFileWriter == null) {
+            backingFileWriter = initWriter();
+          }
+          backingFileWriter.write(entry);
+          backingFileWriter.newLine();
+        }
+        LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+        return null;
+      });
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage(),
+              String.valueOf(ErrorMsg.getErrorMsg(e).getErrorCode())));
     }
   }
 
+  BufferedWriter initWriter() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(backingFile)));
+  }
+
   @Override
   public boolean hasNext() {
-    if (!thresholdHit) {
-      return (cache != null && !cache.isEmpty());
-    }
     if (nextElement != null) {
       return true;
+    } else {
+      try {
+        nextElement = readNextLine();
+        return (nextElement != null);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
     }
-    if (noMoreElement) {
-      return false;
-    }
-    nextElement = readNextLine();
-    if (nextElement == null) {
-      noMoreElement = true;
-    }
-    return !noMoreElement;
   }
 
   @Override
   public String next() {
-    if (!hasNext()) {
+    if (nextElement == null && !hasNext()) {
       throw new NoSuchElementException("No more element in the list backed by " + backingFile);
     }
     String retVal = nextElement;
     nextElement = null;
-    return thresholdHit ? retVal : cache.poll();
-  }
-
-  private synchronized void initStoreToFile(int cacheSize) {
-    if (!thresholdHit) {
-      fileListStreamer.setName(getNextID());
-      fileListStreamer.setDaemon(true);
-      fileListStreamer.start();
-      thresholdHit = true;
-      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
-    }
+    return retVal;
   }
 
-  private String readNextLine() {
-    String nextElement = null;
+  private String readNextLine() throws IOException {
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      if (backingFileReader == null) {
-        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
-        if (fs.exists(backingFile)) {
+      return retryable.executeCallable(() -> {
+        String nextElement;
+        if (backingFileReader == null) {
+          FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+          if(!fs.exists(backingFile)) {

Review comment:
       No, the file gets created while writing the first entry.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org