You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/02/02 02:43:48 UTC

[GitHub] [hive] ArkoSharma opened a new pull request #1936: HIVE-24718: Cleanup of _external_table_info file

ArkoSharma opened a new pull request #1936:
URL: https://github.com/apache/hive/pull/1936


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580050588



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -184,42 +190,86 @@ public void setResultValues(List<String> resultValues) {
     this.resultValues = resultValues;
   }
 
-  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) throws IOException {
     if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
       return Collections.emptyList();
     }
     List<Task<?>> tasks = new ArrayList<>();
-    while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
-      DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString());
-      dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
-      Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
-      tasks.add(task);
-      tracker.addTask(task);
-      LOG.debug("added task for {}", dirCopyWork);
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(UncheckedIOException.class).build();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        try{
+          int numEntriesToSkip = tasks == null ? 0 : tasks.size();
+          while (externalTblCopyPathIterator.hasNext() &&  tracker.canAddMoreTasks()) {
+            if(numEntriesToSkip > 0) {
+              //skip tasks added in previous attempts of this retryable block
+              externalTblCopyPathIterator.next();
+              numEntriesToSkip--;
+              continue;
+            }
+            DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString());
+            dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
+            Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
+            tasks.add(task);
+            tracker.addTask(task);

Review comment:
       They remain in memory in the same list :  'tasks'.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573496104



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {

Review comment:
       Ok, the whole discussion was with the point that the write is going to be single threaded op. Agree, with multiple threads writing it makes sense. However, how does retry work in multiple thread case and is this ordering guarantee not impacted in retry case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r585302812



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1837,64 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+    List<String> clause = new ArrayList<>();
+    //NS replacement parameters has no effect when data is also copied to staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+    clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);
+    assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation,

Review comment:
       nit: If you just pass dumpLocation to the method, and do the path creation inside the method, this would look clean. Anyway the method assertFalseExternalFileList isn't doing much. So, alternatively, you can do the fs.exist() write there and get rid of method.

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -653,6 +649,8 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
       new TimeValidator(TimeUnit.HOURS),
       "Total allowed retry duration in hours inclusive of all retries. Once this is exhausted, " +
         "the policy instance will be marked as failed and will need manual intervention to restart."),
+    REPL_COPY_ITERATOR_RETRY("hive.repl.copy.iterator.retry", true,

Review comment:
       REPL_COPY_FILE_LIST_ITERATOR_RETRY ?

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
##########
@@ -18,147 +18,266 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-
 /**
  * Tests the File List implementation.
  */
 
-@RunWith(PowerMockRunner.class)
+@RunWith(MockitoJUnitRunner.class)
 @PrepareForTest({LoggerFactory.class})
 public class TestFileList {
 
-  @Mock
-  private BufferedWriter bufferedWriter;
-
-
-  @Test
-  public void testNoStreaming() throws Exception {
-    Object tuple[] =  setupAndGetTuple(100, false);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-    fileList.add("Entry1");
-    fileList.add("Entry2");
-    assertFalse(isStreamingToFile(fileListStreamer));
-  }
+  HiveConf conf = new HiveConf();
+  private FSDataOutputStream outStream;
+  private FSDataOutputStream testFileStream;
+  final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+          File.separator + TestFileList.class.getCanonicalName() + "-" + System.currentTimeMillis()
+  ).getPath().replaceAll("\\\\", "/");
+  private Exception testException = new IOException("test");
 
   @Test
-  public void testAlwaysStreaming() throws Exception {
-    Object tuple[] =  setupAndGetTuple(100, true);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-    assertFalse(fileListStreamer.isInitialized());
-    fileList.add("Entry1");
-    waitForStreamingInitialization(fileListStreamer);
-    assertTrue(isStreamingToFile(fileListStreamer));
-    fileList.close();
-    waitForStreamingClosure(fileListStreamer);
-  }
+  public void testConcurrentAdd() throws Exception {
+    FileList fileList = setupFileList();
+    int numOfEntries = 1000;
+    int numOfThreads = 10;
+    ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
 
-  @Test
-  public void testStreaminOnCacheHit() throws Exception {
-    Object tuple[] =  setupAndGetTuple(5, false);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-    fileList.add("Entry1");
-    fileList.add("Entry2");
-    fileList.add("Entry3");
-    Thread.sleep(5000L);
-    assertFalse(fileListStreamer.isInitialized());
-    fileList.add("Entry4");
-    fileList.add("Entry5");
-    waitForStreamingInitialization(fileListStreamer);
+    for (int i=1; i<=numOfEntries; i++) {
+      executorService.submit(() -> {
+        try {
+          fileList.add("someEntry");
+        } catch (IOException e) {
+          throw new RuntimeException("Unbale to add to file list.");
+        }
+      });
+    }
+    executorService.awaitTermination(1, TimeUnit.MINUTES);
     fileList.close();
-    waitForStreamingClosure(fileListStreamer);
+    ArgumentCaptor<String> entryArgs = ArgumentCaptor.forClass(String.class);
+    Mockito.verify(testFileStream, Mockito.times(numOfEntries)).writeBytes(entryArgs.capture());
   }
 
   @Test
-  public void testConcurrentAdd() throws Exception {
-    Object tuple[] =  setupAndGetTuple(100, false);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
+  public void testConcurrentAddWithAbort() throws Exception {
+    FileList fileList = setupFileList(false, false, false);
     int numOfEntries = 1000;
     int numOfThreads = 10;
     ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
+    final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED
+            .format(testException.getMessage());
 
     for (int i=1; i<=numOfEntries; i++) {
       executorService.submit(() -> {
         try {
           fileList.add("someEntry");
-        } catch (SemanticException e) {
-          throw new RuntimeException("Unbale to add to file list.");
+        } catch (IOException e) {
+          Assert.assertTrue(e.getMessage().contains(retryExhaustedMsg));
         }
       });
     }
     executorService.awaitTermination(1, TimeUnit.MINUTES);
-    waitForStreamingInitialization(fileListStreamer);
     fileList.close();
-    waitForStreamingClosure(fileListStreamer);
     ArgumentCaptor<String> entryArgs = ArgumentCaptor.forClass(String.class);
-    Mockito.verify(bufferedWriter, Mockito.times(numOfEntries)).write(entryArgs.capture());
+    //retry exhausted should be encountered by the first thread, so the other threads do not write.
+    Mockito.verify(outStream, Mockito.times(1)).writeBytes(entryArgs.capture());
   }
 
-  private void waitForStreamingInitialization(FileListStreamer fileListStreamer) throws InterruptedException {
-    long sleepTime = 1000L;
-    int iter = 0;
-    while (!fileListStreamer.isInitialized()) {
-      Thread.sleep(sleepTime);
-      iter++;
-      if (iter == 5) {
-        throw new IllegalStateException("File Streamer not initialized till 5s.");
-      }
+  @Test
+  public void testWriteRetryCreateFailure() throws Exception {
+    String testEntry = "someEntry";
+    boolean retryOnCreate = true;
+    FileList fileList = setupFileList(retryOnCreate);
+    final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED
+            .format(testException.getMessage());
+
+    try {
+      fileList.add(testEntry);
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains(retryExhaustedMsg));
+    }
+
+    //the create keeps failing, so create should be called at least twice,
+    //writes and appends do not happen
+    Mockito.verify(fileList, Mockito.atLeast(2)).getWriterCreateMode();
+    Mockito.verify(fileList, Mockito.times(0)).getWriterAppendMode();
+  }
+
+  @Test
+  public void testWriteNoRetry() throws Exception {
+    String testEntry = "someEntry";
+    boolean retryOnCreate = false, retryOnWrite = false;
+    FileList fileList = setupFileList(retryOnCreate, retryOnWrite);
+    final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED
+            .format(testException.getMessage());
+
+    try {
+      fileList.add(testEntry);
+    } catch (IOException e) {
+      Assert.assertFalse(e.getMessage().contains(retryExhaustedMsg));
+      Assert.assertTrue(e.getMessage().contains("test"));
     }
+
+    //the first write fails and no retries are made
+    Mockito.verify(fileList, Mockito.times(1)).getWriterCreateMode();
+    Mockito.verify(outStream, Mockito.times(1)).writeBytes(Mockito.anyString());
+    Mockito.verify(fileList, Mockito.times(0)).getWriterAppendMode();
   }
 
-  private void waitForStreamingClosure(FileListStreamer fileListStreamer) throws InterruptedException {
-    long sleepTime = 1000L;
-    int iter = 0;
-    while (!isStreamingClosedProperly(fileListStreamer)) {
-      Thread.sleep(sleepTime);
-      iter++;
-      if (iter == 5) {
-        throw new IllegalStateException("File Streamer not getting closed till 5s.");
+  @Test
+  public void testReadWithDuplicateEntries() throws Exception {
+    conf = new HiveConf();
+    String testEntry = "someEntry";
+    int numUniqueEntries = 100;
+    Path testFilePath =  new Path(new Path(TEST_DATA_DIR), "testFile");
+    FileList fileList = new FileList(testFilePath, conf);
+
+    for (int i = 1; i <= numUniqueEntries; i++) {
+      String currentUniqueEntry = testEntry + Integer.valueOf(i);

Review comment:
       Integer.valueOf(i); -> just i is fine

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -653,6 +649,8 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
       new TimeValidator(TimeUnit.HOURS),
       "Total allowed retry duration in hours inclusive of all retries. Once this is exhausted, " +
         "the policy instance will be marked as failed and will need manual intervention to restart."),
+    REPL_COPY_ITERATOR_RETRY("hive.repl.copy.iterator.retry", true,

Review comment:
       Add the description on when should this be disabled.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+      writeWithRetry(entry);
     } else {
-      thresholdHit = true;
+      writeEntry(entry);
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+    //retry only during creating the file, no retry during writes
+    if (backingFileWriter == null) {
+      try {
+        Retryable retryable = buildRetryable();
+        retryable.executeCallable((Callable<Void>) () -> {
+          if(this.abortOperation) {
+            return null;
+          }
+          backingFileWriter = getWriterCreateMode();
+          return null;
+        });
+      } catch (Exception e) {
+        this.abortOperation = true;
+        throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+      }
+    }
+    if(this.abortOperation) {
+      return;
+    }
+    try {
+      backingFileWriter.writeBytes(getEntryWithNewline(entry));
+      LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+    } catch (IOException e) {
+      this.abortOperation = true;
+      LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e);
+      throw e;
+    }
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  private synchronized void writeWithRetry(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) () -> {
+        if (this.abortOperation) {
+          return null;

Review comment:
       Add a debug level log statement here

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa
   /*
    * Method used from TestReplicationScenariosExclusiveReplica
    */
-  private void assertExternalFileInfo(List<String> expected, String dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List<String> expected, String dumplocation,
                                       WarehouseInstance warehouseInstance)
           throws IOException {
     Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-    Path externalTableInfoFile;
-    if (isIncremental) {
-      externalTableInfoFile = new Path(hivePath, FILE_NAME);
-    } else {
-      externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-    }
-    ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile);
+    Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
       Wouldn't it be code de-duplication rather? Can you please state one such place, couldn't notice 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573480027



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {

Review comment:
       If it is a single thread writing the entries, ordering wouldn't be messed up, no? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580049385



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();
+            LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+          } catch (IOException e) {
+            LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+            this.retryMode = true;
+            close();
+            throw e;
+          }
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
     }
+  }
+
+  boolean shouldAppend() throws IOException {

Review comment:
       Needed to make it package private for the purpose of testing using Mockito.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r582430148



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();

Review comment:
       Currently we are checking for duplicate entries.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r591024045



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+      writeWithRetry(entry);
     } else {
-      thresholdHit = true;
+      writeEntry(entry);
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+    //retry only during creating the file, no retry during writes
+    if (backingFileWriter == null) {
+      try {
+        Retryable retryable = buildRetryable();
+        retryable.executeCallable((Callable<Void>) () -> {
+          if(this.abortOperation) {
+            return null;
+          }
+          backingFileWriter = getWriterCreateMode();
+          return null;
+        });
+      } catch (Exception e) {
+        this.abortOperation = true;
+        throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+      }
+    }
+    if(this.abortOperation) {
+      return;
+    }
+    try {
+      backingFileWriter.writeBytes(getEntryWithNewline(entry));
+      LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+    } catch (IOException e) {
+      this.abortOperation = true;
+      LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e);
+      throw e;
+    }
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  private synchronized void writeWithRetry(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) () -> {
+        if (this.abortOperation) {
+          return null;

Review comment:
       Done.

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
##########
@@ -18,147 +18,266 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-
 /**
  * Tests the File List implementation.
  */
 
-@RunWith(PowerMockRunner.class)
+@RunWith(MockitoJUnitRunner.class)
 @PrepareForTest({LoggerFactory.class})
 public class TestFileList {
 
-  @Mock
-  private BufferedWriter bufferedWriter;
-
-
-  @Test
-  public void testNoStreaming() throws Exception {
-    Object tuple[] =  setupAndGetTuple(100, false);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-    fileList.add("Entry1");
-    fileList.add("Entry2");
-    assertFalse(isStreamingToFile(fileListStreamer));
-  }
+  HiveConf conf = new HiveConf();
+  private FSDataOutputStream outStream;
+  private FSDataOutputStream testFileStream;
+  final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+          File.separator + TestFileList.class.getCanonicalName() + "-" + System.currentTimeMillis()
+  ).getPath().replaceAll("\\\\", "/");
+  private Exception testException = new IOException("test");
 
   @Test
-  public void testAlwaysStreaming() throws Exception {
-    Object tuple[] =  setupAndGetTuple(100, true);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-    assertFalse(fileListStreamer.isInitialized());
-    fileList.add("Entry1");
-    waitForStreamingInitialization(fileListStreamer);
-    assertTrue(isStreamingToFile(fileListStreamer));
-    fileList.close();
-    waitForStreamingClosure(fileListStreamer);
-  }
+  public void testConcurrentAdd() throws Exception {
+    FileList fileList = setupFileList();
+    int numOfEntries = 1000;
+    int numOfThreads = 10;
+    ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
 
-  @Test
-  public void testStreaminOnCacheHit() throws Exception {
-    Object tuple[] =  setupAndGetTuple(5, false);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-    fileList.add("Entry1");
-    fileList.add("Entry2");
-    fileList.add("Entry3");
-    Thread.sleep(5000L);
-    assertFalse(fileListStreamer.isInitialized());
-    fileList.add("Entry4");
-    fileList.add("Entry5");
-    waitForStreamingInitialization(fileListStreamer);
+    for (int i=1; i<=numOfEntries; i++) {
+      executorService.submit(() -> {
+        try {
+          fileList.add("someEntry");
+        } catch (IOException e) {
+          throw new RuntimeException("Unbale to add to file list.");
+        }
+      });
+    }
+    executorService.awaitTermination(1, TimeUnit.MINUTES);
     fileList.close();
-    waitForStreamingClosure(fileListStreamer);
+    ArgumentCaptor<String> entryArgs = ArgumentCaptor.forClass(String.class);
+    Mockito.verify(testFileStream, Mockito.times(numOfEntries)).writeBytes(entryArgs.capture());
   }
 
   @Test
-  public void testConcurrentAdd() throws Exception {
-    Object tuple[] =  setupAndGetTuple(100, false);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
+  public void testConcurrentAddWithAbort() throws Exception {
+    FileList fileList = setupFileList(false, false, false);
     int numOfEntries = 1000;
     int numOfThreads = 10;
     ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
+    final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED
+            .format(testException.getMessage());
 
     for (int i=1; i<=numOfEntries; i++) {
       executorService.submit(() -> {
         try {
           fileList.add("someEntry");
-        } catch (SemanticException e) {
-          throw new RuntimeException("Unbale to add to file list.");
+        } catch (IOException e) {
+          Assert.assertTrue(e.getMessage().contains(retryExhaustedMsg));
         }
       });
     }
     executorService.awaitTermination(1, TimeUnit.MINUTES);
-    waitForStreamingInitialization(fileListStreamer);
     fileList.close();
-    waitForStreamingClosure(fileListStreamer);
     ArgumentCaptor<String> entryArgs = ArgumentCaptor.forClass(String.class);
-    Mockito.verify(bufferedWriter, Mockito.times(numOfEntries)).write(entryArgs.capture());
+    //retry exhausted should be encountered by the first thread, so the other threads do not write.
+    Mockito.verify(outStream, Mockito.times(1)).writeBytes(entryArgs.capture());
   }
 
-  private void waitForStreamingInitialization(FileListStreamer fileListStreamer) throws InterruptedException {
-    long sleepTime = 1000L;
-    int iter = 0;
-    while (!fileListStreamer.isInitialized()) {
-      Thread.sleep(sleepTime);
-      iter++;
-      if (iter == 5) {
-        throw new IllegalStateException("File Streamer not initialized till 5s.");
-      }
+  @Test
+  public void testWriteRetryCreateFailure() throws Exception {
+    String testEntry = "someEntry";
+    boolean retryOnCreate = true;
+    FileList fileList = setupFileList(retryOnCreate);
+    final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED
+            .format(testException.getMessage());
+
+    try {
+      fileList.add(testEntry);
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains(retryExhaustedMsg));
+    }
+
+    //the create keeps failing, so create should be called at least twice,
+    //writes and appends do not happen
+    Mockito.verify(fileList, Mockito.atLeast(2)).getWriterCreateMode();
+    Mockito.verify(fileList, Mockito.times(0)).getWriterAppendMode();
+  }
+
+  @Test
+  public void testWriteNoRetry() throws Exception {
+    String testEntry = "someEntry";
+    boolean retryOnCreate = false, retryOnWrite = false;
+    FileList fileList = setupFileList(retryOnCreate, retryOnWrite);
+    final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED
+            .format(testException.getMessage());
+
+    try {
+      fileList.add(testEntry);
+    } catch (IOException e) {
+      Assert.assertFalse(e.getMessage().contains(retryExhaustedMsg));
+      Assert.assertTrue(e.getMessage().contains("test"));
     }
+
+    //the first write fails and no retries are made
+    Mockito.verify(fileList, Mockito.times(1)).getWriterCreateMode();
+    Mockito.verify(outStream, Mockito.times(1)).writeBytes(Mockito.anyString());
+    Mockito.verify(fileList, Mockito.times(0)).getWriterAppendMode();
   }
 
-  private void waitForStreamingClosure(FileListStreamer fileListStreamer) throws InterruptedException {
-    long sleepTime = 1000L;
-    int iter = 0;
-    while (!isStreamingClosedProperly(fileListStreamer)) {
-      Thread.sleep(sleepTime);
-      iter++;
-      if (iter == 5) {
-        throw new IllegalStateException("File Streamer not getting closed till 5s.");
+  @Test
+  public void testReadWithDuplicateEntries() throws Exception {
+    conf = new HiveConf();
+    String testEntry = "someEntry";
+    int numUniqueEntries = 100;
+    Path testFilePath =  new Path(new Path(TEST_DATA_DIR), "testFile");
+    FileList fileList = new FileList(testFilePath, conf);
+
+    for (int i = 1; i <= numUniqueEntries; i++) {
+      String currentUniqueEntry = testEntry + Integer.valueOf(i);

Review comment:
       Made this change.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1837,64 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+    List<String> clause = new ArrayList<>();
+    //NS replacement parameters has no effect when data is also copied to staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+    clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);
+    assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation,

Review comment:
       Done.

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -653,6 +649,8 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
       new TimeValidator(TimeUnit.HOURS),
       "Total allowed retry duration in hours inclusive of all retries. Once this is exhausted, " +
         "the policy instance will be marked as failed and will need manual intervention to restart."),
+    REPL_COPY_ITERATOR_RETRY("hive.repl.copy.iterator.retry", true,

Review comment:
       Added description and renamed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r579861655



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()

Review comment:
       Currently it is used only in this add function, so creating it locally.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573490313



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {

Review comment:
       During bootstrap dump of managed-table partitions, there could be multiple threads trying to access this writer. This number is governed by the conf REPL_PARTITIONS_DUMP_PARALLELISM. So, the order could be messed up.
   
   Earlier I was referring to the implementation of BufferedWriter, which makes write operation thread safe, meaning only one thread writes some value at a time. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573292875



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()

Review comment:
       Can we create this just one per FileList?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {

Review comment:
       Do you need to synchronize this? How many threads can possibly write at any given moment?
   Also, synchronizing on a non-final object. You can mark the Object as final.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {
+          if (backingFileWriter == null) {
+            backingFileWriter = initWriter();
+          }
+          backingFileWriter.write(entry);
+          backingFileWriter.newLine();
+        }
+        LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+        return null;
+      });
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage(),
+              String.valueOf(ErrorMsg.getErrorMsg(e).getErrorCode())));
     }
   }
 
+  BufferedWriter initWriter() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(backingFile)));
+  }
+
   @Override
   public boolean hasNext() {
-    if (!thresholdHit) {
-      return (cache != null && !cache.isEmpty());
-    }
     if (nextElement != null) {
       return true;
+    } else {
+      try {
+        nextElement = readNextLine();
+        return (nextElement != null);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
     }
-    if (noMoreElement) {
-      return false;
-    }
-    nextElement = readNextLine();
-    if (nextElement == null) {
-      noMoreElement = true;
-    }
-    return !noMoreElement;
   }
 
   @Override
   public String next() {
-    if (!hasNext()) {
+    if (nextElement == null && !hasNext()) {
       throw new NoSuchElementException("No more element in the list backed by " + backingFile);
     }
     String retVal = nextElement;
     nextElement = null;
-    return thresholdHit ? retVal : cache.poll();
-  }
-
-  private synchronized void initStoreToFile(int cacheSize) {
-    if (!thresholdHit) {
-      fileListStreamer.setName(getNextID());
-      fileListStreamer.setDaemon(true);
-      fileListStreamer.start();
-      thresholdHit = true;
-      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
-    }
+    return retVal;
   }
 
-  private String readNextLine() {
-    String nextElement = null;
+  private String readNextLine() throws IOException {
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      if (backingFileReader == null) {
-        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
-        if (fs.exists(backingFile)) {
+      return retryable.executeCallable(() -> {
+        String nextElement;
+        if (backingFileReader == null) {
+          FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+          if(!fs.exists(backingFile)) {

Review comment:
       Will file not be  present always? Even if it is empty?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {
+          if (backingFileWriter == null) {
+            backingFileWriter = initWriter();
+          }
+          backingFileWriter.write(entry);
+          backingFileWriter.newLine();
+        }
+        LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+        return null;
+      });
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage(),
+              String.valueOf(ErrorMsg.getErrorMsg(e).getErrorCode())));
     }
   }
 
+  BufferedWriter initWriter() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(backingFile)));
+  }
+
   @Override
   public boolean hasNext() {
-    if (!thresholdHit) {
-      return (cache != null && !cache.isEmpty());
-    }
     if (nextElement != null) {
       return true;
+    } else {
+      try {
+        nextElement = readNextLine();
+        return (nextElement != null);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
     }
-    if (noMoreElement) {
-      return false;
-    }
-    nextElement = readNextLine();
-    if (nextElement == null) {
-      noMoreElement = true;
-    }
-    return !noMoreElement;
   }
 
   @Override
   public String next() {
-    if (!hasNext()) {
+    if (nextElement == null && !hasNext()) {
       throw new NoSuchElementException("No more element in the list backed by " + backingFile);
     }
     String retVal = nextElement;
     nextElement = null;
-    return thresholdHit ? retVal : cache.poll();
-  }
-
-  private synchronized void initStoreToFile(int cacheSize) {
-    if (!thresholdHit) {
-      fileListStreamer.setName(getNextID());
-      fileListStreamer.setDaemon(true);
-      fileListStreamer.start();
-      thresholdHit = true;
-      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
-    }
+    return retVal;
   }
 
-  private String readNextLine() {
-    String nextElement = null;
+  private String readNextLine() throws IOException {
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      if (backingFileReader == null) {
-        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
-        if (fs.exists(backingFile)) {
+      return retryable.executeCallable(() -> {
+        String nextElement;
+        if (backingFileReader == null) {
+          FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+          if(!fs.exists(backingFile)) {
+            return null;
+          }
           backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
         }
-      }
-      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
-    } catch (IOException e) {
-      LOG.error("Unable to read list from backing file " + backingFile, e);
+        nextElement = backingFileReader.readLine();

Review comment:
       Add test for retry

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -2229,13 +2228,7 @@ private void assertExternalFileInfo(List<String> expected, String dumplocation,
                                       WarehouseInstance warehouseInstance)
           throws IOException {
     Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-    Path externalTableInfoFile;
-    if (isIncremental) {
-      externalTableInfoFile = new Path(hivePath, FILE_NAME);
-    } else {
-      externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-    }
+    Path externalTableInfoFile = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
       No more info file, we can name it as per new one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r594105742



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##########
@@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
             .verifyResult(inc2Tuple.lastReplicationId);
   }
 
-  private void assertFalseExternalFileList(Path externalTableFileList)
-      throws IOException {
+  private void assertFalseExternalFileList(String dumpLocation)
+          throws IOException {

Review comment:
       getFileSystem() call throws IOException.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580050377



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();
+            LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+          } catch (IOException e) {
+            LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+            this.retryMode = true;
+            close();
+            throw e;
+          }
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
     }
+  }
+
+  boolean shouldAppend() throws IOException {
+    return backingFile.getFileSystem(conf).exists(backingFile) && this.retryMode;

Review comment:
       In case of bootstrap checkpointing there could be an earlier file lying around from the previous attempt. We rewrite this file in that case.
   To distinguish that case from an ongoing retry in the current dump, this flag is introduced.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r594105835



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##########
@@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
             .verifyResult(inc2Tuple.lastReplicationId);
   }
 
-  private void assertFalseExternalFileList(Path externalTableFileList)
-      throws IOException {
+  private void assertFalseExternalFileList(String dumpLocation)

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r582431197



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##########
@@ -103,9 +102,9 @@ public void replicationWithoutExternalTables() throws Throwable {
         .run("insert into table t2 partition(country='france') values ('paris')")
         .dump(primaryDbName, dumpWithClause);
 
-    // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
-    assertFalse(primary.miniDFSCluster.getFileSystem()
-        .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
+    // the _file_list_external only should be created if external tables are to be replicated not otherwise
+    assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation,

Review comment:
       Removing would cause code-duplication in most places, except in TestReplicationScenariosExternalTablesMetaDataOnly.java . Refactored the code there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573463126



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {

Review comment:
       Printing the entry and writing the newline character are being done in separate statements. For two entries e1,e2, we need the order of writing to be e1,newline,e2,newline or e2,newline,e1,newline. This order would be guaranteed only if we use 'synchronized' block containing the two statements. Otherwise, we could have cases like e1,e2,newline,newline, which would cause errors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580465458



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();

Review comment:
       This patch assumes that each flush operation either writes data completely (including ack) or does not write it at all. If this assumption changes, then we might need future investigation to determine the boundaries of each entry that needs to be written to the file. So currently checks for duplicate entries and invalid entries are not being done.
   
   Hflush guarantees that after successful return, every new reader would see the flushed data. This guarantee seems enough as we do not have reads during writes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580192079



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();
+            LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+          } catch (IOException e) {
+            LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+            this.retryMode = true;
+            close();
+            throw e;
+          }
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
     }
+  }
+
+  boolean shouldAppend() throws IOException {
+    return backingFile.getFileSystem(conf).exists(backingFile) && this.retryMode;

Review comment:
       makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r579994523



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
##########
@@ -75,7 +77,9 @@ public String convertToString() {
     StringBuilder objInStr = new StringBuilder();
     objInStr.append(fullyQualifiedSourcePath)
             .append(URI_SEPARATOR)
-            .append(fullyQualifiedTargetPath);
+            .append(fullyQualifiedTargetPath)
+            .append(URI_SEPARATOR)
+            .append(tableName);

Review comment:
       Any specific reason for moving the tableName to the end?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa
   /*
    * Method used from TestReplicationScenariosExclusiveReplica
    */
-  private void assertExternalFileInfo(List<String> expected, String dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List<String> expected, String dumplocation,
                                       WarehouseInstance warehouseInstance)
           throws IOException {
     Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-    Path externalTableInfoFile;
-    if (isIncremental) {
-      externalTableInfoFile = new Path(hivePath, FILE_NAME);
-    } else {
-      externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-    }
-    ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile);
+    Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
       Given that there is a fixed location for FILE_LIST_EXTERNAL in both incremental and bootstrap now. Do you really need this method any more?
   TestReplicationScenariosAcrossInstances.assertExternalFileList ?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##########
@@ -103,9 +102,9 @@ public void replicationWithoutExternalTables() throws Throwable {
         .run("insert into table t2 partition(country='france') values ('paris')")
         .dump(primaryDbName, dumpWithClause);
 
-    // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
-    assertFalse(primary.miniDFSCluster.getFileSystem()
-        .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
+    // the _file_list_external only should be created if external tables are to be replicated not otherwise
+    assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation,

Review comment:
       Given that location for FILE_LIST_EXTERNAL is fixed for both bootstrap and incremental now. There are many methods not required anymore e.g. assertExternalFileList. Can you please check from this perspective and remove them off by doing a bit of refactoring.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {

Review comment:
       While one thread is about to do a retry, are you allowing a new thread to write? And if so, does a failure in one result in failure in other FileList.add call as well?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();
+            LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+          } catch (IOException e) {
+            LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+            this.retryMode = true;
+            close();
+            throw e;
+          }
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
     }
+  }
+
+  boolean shouldAppend() throws IOException {

Review comment:
       private?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();
+            LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+          } catch (IOException e) {
+            LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+            this.retryMode = true;
+            close();
+            throw e;
+          }
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
     }
+  }
+
+  boolean shouldAppend() throws IOException {
+    return backingFile.getFileSystem(conf).exists(backingFile) && this.retryMode;

Review comment:
       When do you expect backingFile.getFileSystem(conf).exists(backingFile) as true but this.retryMode is false and what does it mean?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -184,42 +190,86 @@ public void setResultValues(List<String> resultValues) {
     this.resultValues = resultValues;
   }
 
-  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) throws IOException {
     if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
       return Collections.emptyList();
     }
     List<Task<?>> tasks = new ArrayList<>();
-    while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
-      DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString());
-      dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
-      Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
-      tasks.add(task);
-      tracker.addTask(task);
-      LOG.debug("added task for {}", dirCopyWork);
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(UncheckedIOException.class).build();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        try{
+          int numEntriesToSkip = tasks == null ? 0 : tasks.size();
+          while (externalTblCopyPathIterator.hasNext() &&  tracker.canAddMoreTasks()) {
+            if(numEntriesToSkip > 0) {
+              //skip tasks added in previous attempts of this retryable block
+              externalTblCopyPathIterator.next();
+              numEntriesToSkip--;
+              continue;
+            }
+            DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString());
+            dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
+            Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
+            tasks.add(task);
+            tracker.addTask(task);

Review comment:
       What happens to tasks added prior to retrty




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573450950



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {
+          if (backingFileWriter == null) {
+            backingFileWriter = initWriter();
+          }
+          backingFileWriter.write(entry);
+          backingFileWriter.newLine();
+        }
+        LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+        return null;
+      });
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage(),
+              String.valueOf(ErrorMsg.getErrorMsg(e).getErrorCode())));
     }
   }
 
+  BufferedWriter initWriter() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(backingFile)));
+  }
+
   @Override
   public boolean hasNext() {
-    if (!thresholdHit) {
-      return (cache != null && !cache.isEmpty());
-    }
     if (nextElement != null) {
       return true;
+    } else {
+      try {
+        nextElement = readNextLine();
+        return (nextElement != null);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
     }
-    if (noMoreElement) {
-      return false;
-    }
-    nextElement = readNextLine();
-    if (nextElement == null) {
-      noMoreElement = true;
-    }
-    return !noMoreElement;
   }
 
   @Override
   public String next() {
-    if (!hasNext()) {
+    if (nextElement == null && !hasNext()) {
       throw new NoSuchElementException("No more element in the list backed by " + backingFile);
     }
     String retVal = nextElement;
     nextElement = null;
-    return thresholdHit ? retVal : cache.poll();
-  }
-
-  private synchronized void initStoreToFile(int cacheSize) {
-    if (!thresholdHit) {
-      fileListStreamer.setName(getNextID());
-      fileListStreamer.setDaemon(true);
-      fileListStreamer.start();
-      thresholdHit = true;
-      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
-    }
+    return retVal;
   }
 
-  private String readNextLine() {
-    String nextElement = null;
+  private String readNextLine() throws IOException {
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      if (backingFileReader == null) {
-        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
-        if (fs.exists(backingFile)) {
+      return retryable.executeCallable(() -> {
+        String nextElement;
+        if (backingFileReader == null) {
+          FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+          if(!fs.exists(backingFile)) {

Review comment:
       No, the file gets created while writing the first entry.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580045609



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
##########
@@ -75,7 +77,9 @@ public String convertToString() {
     StringBuilder objInStr = new StringBuilder();
     objInStr.append(fullyQualifiedSourcePath)
             .append(URI_SEPARATOR)
-            .append(fullyQualifiedTargetPath);
+            .append(fullyQualifiedTargetPath)
+            .append(URI_SEPARATOR)
+            .append(tableName);

Review comment:
       In earlier version, source and target were the first and second entries. So introducing this change would still be consistent with any earlier dumps with that version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r583233006



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+    List<String> clause = new ArrayList<>();
+    //NS replacement parameters has no effect when data is also copied to staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+    clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);

Review comment:
       At this point the entries are written to the _file_list and _file_list_external. 
   Please add assertion on file content here.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+    List<String> clause = new ArrayList<>();
+    //NS replacement parameters has no effect when data is also copied to staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+    clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);
+    replica.load(replicatedDbName, primaryDbName, clause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+
+    primary.run("use " + primaryDbName)
+            .run("insert into table1 values (3)")
+            .dump(primaryDbName, clause);
+    replica.load(replicatedDbName, primaryDbName, clause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[]{"1", "2", "3"});
+
+    clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create external table ext_table1 (id int)")
+            .run("insert into ext_table1 values (3)")
+            .run("insert into ext_table1 values (4)")
+            .run("create external table  ext_table2 (key int, value int) partitioned by (load_time timestamp)")
+            .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.123') values(1,2)")
+            .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.124') values(1,3)")
+            .run("show partitions ext_table2")
+            .verifyResults(new String[]{
+                    "load_time=2012-02-21 07%3A08%3A09.123",
+                    "load_time=2012-02-21 07%3A08%3A09.124"})
+            .dump(primaryDbName, clause);

Review comment:
       At this point the entries are written to the _file_list and _file_list_external.
   Please add assertion on file content here.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+      writeWithRetry(entry);
     } else {
-      thresholdHit = true;
+      writeEntry(entry);
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+    //retry only during creating the file, no retry during writes
+    if (backingFileWriter == null) {
+      try {
+        Retryable retryable = buildRetryable();
+        retryable.executeCallable((Callable<Void>) () -> {
+          if(this.abortOperation) {
+            return null;
+          }
+          backingFileWriter = getWriterCreateMode();
+          return null;
+        });
+      } catch (Exception e) {
+        this.abortOperation = true;
+        throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+      }
+    }
+    if(this.abortOperation) {
+      return;
+    }
+    try {
+      backingFileWriter.writeBytes(getEntryWithNewline(entry));
+      LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+    } catch (IOException e) {
+      this.abortOperation = true;
+      LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e);
+      throw e;
+    }
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  private synchronized void writeWithRetry(String entry) throws IOException {
+    Retryable retryable = buildRetryable();

Review comment:
       Add test for retry case

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+      writeWithRetry(entry);
     } else {
-      thresholdHit = true;
+      writeEntry(entry);
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+    //retry only during creating the file, no retry during writes
+    if (backingFileWriter == null) {
+      try {
+        Retryable retryable = buildRetryable();
+        retryable.executeCallable((Callable<Void>) () -> {
+          if(this.abortOperation) {
+            return null;
+          }
+          backingFileWriter = getWriterCreateMode();
+          return null;
+        });
+      } catch (Exception e) {
+        this.abortOperation = true;
+        throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+      }
+    }
+    if(this.abortOperation) {
+      return;
+    }
+    try {
+      backingFileWriter.writeBytes(getEntryWithNewline(entry));
+      LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+    } catch (IOException e) {
+      this.abortOperation = true;
+      LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e);
+      throw e;
+    }
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  private synchronized void writeWithRetry(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) () -> {
+        if (this.abortOperation) {
+          return null;
+        }
+        try {
+          if (backingFileWriter == null) {
+            backingFileWriter = initWriter();
+          }
+          backingFileWriter.writeBytes(getEntryWithNewline(entry));
+          backingFileWriter.hflush();
+          LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+        } catch (IOException e) {
+          LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+          this.retryMode = true;
+          close();
+          throw e;
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      this.abortOperation = true;
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
+  }
+
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
+  }
+
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
+    }
+  }
+
+  private boolean shouldAppend() throws IOException {
+    return backingFile.getFileSystem(conf).exists(backingFile) && this.retryMode;
+  }
+
+  FSDataOutputStream getWriterCreateMode() throws IOException {
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
+      return backingFile.getFileSystem(conf).create(backingFile);
+    } catch (IOException e) {
+      LOG.error("Error opening {} in append mode", backingFile, e);
+      throw e;
     }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+  }
+
+  FSDataOutputStream getWriterAppendMode() throws IOException {
+    try {
+      return backingFile.getFileSystem(conf).append(backingFile);
+    } catch (IOException e) {
+      LOG.error("Error creating file {}", backingFile, e);
+      throw e;
     }
   }
 
   @Override
   public boolean hasNext() {
-    if (!thresholdHit) {
-      return (cache != null && !cache.isEmpty());
-    }
-    if (nextElement != null) {
+    /*
+    We assume that every add operation either adds an entry completely or doesn't add at all.
+    If this assumption changes then in the following check we should check for incomplete entries.
+    We remove duplicate entries assuming they are only written consecutively.
+    */
+    if (nextElement != null && !nextElement.equals(lastReadElement)) {
       return true;
+    } else {
+      try {
+        do {
+          nextElement = readNextLine();
+          if(nextElement != null && !nextElement.equals(lastReadElement)) {
+            return true;
+          }
+        } while (nextElement != null);
+        return false;
+      } catch (IOException e) {
+        nextElement = null;
+        lastReadElement = null;
+        backingFileReader = null;
+        throw new UncheckedIOException(e);
+      }
     }
-    if (noMoreElement) {
-      return false;
-    }
-    nextElement = readNextLine();
-    if (nextElement == null) {
-      noMoreElement = true;
-    }
-    return !noMoreElement;
   }
 
   @Override
   public String next() {
-    if (!hasNext()) {
+    if ((nextElement == null || nextElement.equals(lastReadElement)) && !hasNext()) {
       throw new NoSuchElementException("No more element in the list backed by " + backingFile);
     }
-    String retVal = nextElement;
+    lastReadElement = nextElement;
     nextElement = null;
-    return thresholdHit ? retVal : cache.poll();
-  }
-
-  private synchronized void initStoreToFile(int cacheSize) {
-    if (!thresholdHit) {
-      fileListStreamer.setName(getNextID());
-      fileListStreamer.setDaemon(true);
-      fileListStreamer.start();
-      thresholdHit = true;
-      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
-    }
+    return lastReadElement;
   }
 
-  private String readNextLine() {
-    String nextElement = null;
-    try {
+  private String readNextLine() throws IOException {
+    try{
+      String nextElement;
       if (backingFileReader == null) {
-        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
-        if (fs.exists(backingFile)) {
-          backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
+        FileSystem fs = backingFile.getFileSystem(conf);
+        if (!fs.exists(backingFile)) {
+          return null;
         }
+        backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
       }
-      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
+      nextElement = backingFileReader.readLine();
+      return nextElement;
     } catch (IOException e) {
-      LOG.error("Unable to read list from backing file " + backingFile, e);
+      LOG.error("Exception while reading file {}.", backingFile, e);
+      close();
+      throw e;
     }
-    return nextElement;
   }
 
   @Override
   public void close() throws IOException {
-    if (thresholdHit && fileListStreamer != null) {
-      fileListStreamer.close();
-    }
-    if (backingFileReader != null) {
-      backingFileReader.close();
-    }
-    LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit);
-  }
-
-  private static String getNextID() {
-    if (Integer.MAX_VALUE == fileListStreamerID) {
-      //reset the counter
-      fileListStreamerID = 0;
+    try {
+      if (backingFileReader != null) {
+        backingFileReader.close();
+      }
+      if (backingFileWriter != null) {
+        backingFileWriter.close();
+      }
+      LOG.info("Completed close for File List backed by:{}", backingFile);
+    } finally {
+      if(backingFileReader != null) {

Review comment:
       nit: You can consider removing if blocks . 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {

Review comment:
       Actually it doesn't check the fact that retry isn't happening, no?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -658,35 +657,34 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
         Path dbRootMetadata = new Path(metadataPath, dbName);
         Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName);
         boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
-        try (Writer writer = new Writer(dumpRoot, conf)) {
-          for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
-            try {
-              Table table = hiveDb.getTable(dbName, tableName);
+        ReplExternalTables externalTablesWriter = new ReplExternalTables(conf);
+        for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
+          try {
+            Table table = hiveDb.getTable(dbName, tableName);
 
-              // Dump external table locations if required.
-              if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
-                      && shouldDumpExternalTableLocation()) {
-                writer.dataLocationDump(table, extTableFileList, conf);
-              }
+            // Dump external table locations if required.
+            if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+                    && shouldDumpExternalTableLocation()) {

Review comment:
       nit: can be in the same line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580321113



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();

Review comment:
       On hflush():
   a) AFAIK, this doesn't give guarantee that the data is written on the disk. What happens if all the data nodes goes down simultaneously? How does retry handle in that case?
   b) What happens when data is written but while receiving  ACK from data node(s), the connection breaks for some reason? Wouldn't we have duplicate entries in that case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573456193



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {

Review comment:
       Why do we need synchronization if it is just one thread writing? How does synchronizing it guarantees that atomicity (either or none) which you are expecting? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r582431522



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {

Review comment:
       Dump operation fails if any thread encounters an exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580049385



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();
+            LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+          } catch (IOException e) {
+            LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+            this.retryMode = true;
+            close();
+            throw e;
+          }
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
     }
+  }
+
+  boolean shouldAppend() throws IOException {

Review comment:
       Needed to make it package private for the purpose of testing using Mockito.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r584584623



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+      writeWithRetry(entry);
     } else {
-      thresholdHit = true;
+      writeEntry(entry);
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+    //retry only during creating the file, no retry during writes
+    if (backingFileWriter == null) {
+      try {
+        Retryable retryable = buildRetryable();
+        retryable.executeCallable((Callable<Void>) () -> {
+          if(this.abortOperation) {
+            return null;
+          }
+          backingFileWriter = getWriterCreateMode();
+          return null;
+        });
+      } catch (Exception e) {
+        this.abortOperation = true;
+        throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+      }
+    }
+    if(this.abortOperation) {
+      return;
+    }
+    try {
+      backingFileWriter.writeBytes(getEntryWithNewline(entry));
+      LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+    } catch (IOException e) {
+      this.abortOperation = true;
+      LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e);
+      throw e;
+    }
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  private synchronized void writeWithRetry(String entry) throws IOException {
+    Retryable retryable = buildRetryable();

Review comment:
       Added test for retry in an earlier commit [Add retry logic](https://github.com/apache/hive/pull/1936/commits/8b5bed7ae12cf112f6aeb689fec67774d0c84957)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r582431043



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa
   /*
    * Method used from TestReplicationScenariosExclusiveReplica
    */
-  private void assertExternalFileInfo(List<String> expected, String dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List<String> expected, String dumplocation,
                                       WarehouseInstance warehouseInstance)
           throws IOException {
     Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-    Path externalTableInfoFile;
-    if (isIncremental) {
-      externalTableInfoFile = new Path(hivePath, FILE_NAME);
-    } else {
-      externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-    }
-    ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile);
+    Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
       Removing would cause code-duplication in most places, except in TestReplicationScenariosExternalTablesMetaDataOnly.java . Refactored the code there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580048118



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {

Review comment:
       Yes, other threads are allowed to write.
   Failure in one thread itself doesn't imply failure in the other threads. This is because after failure, we close the output stream and reopen it for whichever thread is trying to do the next write.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r584585551



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {

Review comment:
       No, this is just to have a case with replication working with this conf=false.
   To confirm retry is not happening, the test "testWriteNoRetry" is added in TestFileList class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha merged pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
pkumarsinha merged pull request #1936:
URL: https://github.com/apache/hive/pull/1936


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580191067



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {

Review comment:
       What I meant is, failure (retry exhaust case) in one thread would mean that entire dump operation has to fail. So success of each thread matters here, no? Are the other running tasks which are doing add operations getting interrupted if one of them fails?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580051218



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -184,42 +190,86 @@ public void setResultValues(List<String> resultValues) {
     this.resultValues = resultValues;
   }
 
-  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) throws IOException {
     if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
       return Collections.emptyList();
     }
     List<Task<?>> tasks = new ArrayList<>();
-    while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
-      DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString());
-      dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
-      Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
-      tasks.add(task);
-      tracker.addTask(task);
-      LOG.debug("added task for {}", dirCopyWork);
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(UncheckedIOException.class).build();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        try{
+          int numEntriesToSkip = tasks == null ? 0 : tasks.size();
+          while (externalTblCopyPathIterator.hasNext() &&  tracker.canAddMoreTasks()) {
+            if(numEntriesToSkip > 0) {
+              //skip tasks added in previous attempts of this retryable block
+              externalTblCopyPathIterator.next();
+              numEntriesToSkip--;
+              continue;
+            }
+            DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString());
+            dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
+            Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
+            tasks.add(task);
+            tracker.addTask(task);

Review comment:
       Also they are not added again during retires. This is done depending on the size of the 'tasks' list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r592395172



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##########
@@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
             .verifyResult(inc2Tuple.lastReplicationId);
   }
 
-  private void assertFalseExternalFileList(Path externalTableFileList)
-      throws IOException {
+  private void assertFalseExternalFileList(String dumpLocation)

Review comment:
       Can you please move this method to ReplicationTestUtils itself. We have duplicate code for this method on two classes.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##########
@@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
             .verifyResult(inc2Tuple.lastReplicationId);
   }
 
-  private void assertFalseExternalFileList(Path externalTableFileList)
-      throws IOException {
+  private void assertFalseExternalFileList(String dumpLocation)
+          throws IOException {

Review comment:
       I think it doesn't throw IOException 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa
   /*
    * Method used from TestReplicationScenariosExclusiveReplica
    */
-  private void assertExternalFileInfo(List<String> expected, String dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List<String> expected, String dumplocation,
                                       WarehouseInstance warehouseInstance)
           throws IOException {
     Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-    Path externalTableInfoFile;
-    if (isIncremental) {
-      externalTableInfoFile = new Path(hivePath, FILE_NAME);
-    } else {
-      externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-    }
-    ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile);
+    Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
       This is still not addressed. The same method(and code) is defined in three classes. assertExternalFileList. And essentially they aren't doing more than the path formation. As discussed, can we not use the ReplicationTestUtils.assertExternalFileList directly by modifying the signature a bit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580104313



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile) {
+          try{
+            if (backingFileWriter == null) {
+              backingFileWriter = initWriter();
+            }
+            backingFileWriter.writeBytes(getEntryWithNewline(entry));
+            backingFileWriter.hflush();
+            LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+          } catch (IOException e) {
+            LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+            this.retryMode = true;
+            close();
+            throw e;
+          }
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
     }
+  }
+
+  boolean shouldAppend() throws IOException {

Review comment:
       Will make this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Cleanup of _external_table_info file

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573451500



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
-    } else {
-      thresholdHit = true;
-    }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
-    }
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
-    }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+      retryable.executeCallable((Callable<Void>) ()-> {
+        synchronized (backingFile ) {

Review comment:
       1 thread can write at a moment but synchronisation is needed since we have two elements to write per entry. One being the entry and other is the newline. These two elements need to be written atomically.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r584584623



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+      writeWithRetry(entry);
     } else {
-      thresholdHit = true;
+      writeEntry(entry);
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+    //retry only during creating the file, no retry during writes
+    if (backingFileWriter == null) {
+      try {
+        Retryable retryable = buildRetryable();
+        retryable.executeCallable((Callable<Void>) () -> {
+          if(this.abortOperation) {
+            return null;
+          }
+          backingFileWriter = getWriterCreateMode();
+          return null;
+        });
+      } catch (Exception e) {
+        this.abortOperation = true;
+        throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+      }
+    }
+    if(this.abortOperation) {
+      return;
+    }
+    try {
+      backingFileWriter.writeBytes(getEntryWithNewline(entry));
+      LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+    } catch (IOException e) {
+      this.abortOperation = true;
+      LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e);
+      throw e;
+    }
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  private synchronized void writeWithRetry(String entry) throws IOException {
+    Retryable retryable = buildRetryable();

Review comment:
       Added test for retry in an earlier commit [Added unit tests for retry, addressed review comments](https://github.com/apache/hive/pull/1936/commits/d0139583d1876c114ef1993b41202afb3a444abf)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r585220794



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+    List<String> clause = new ArrayList<>();
+    //NS replacement parameters has no effect when data is also copied to staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+    clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);

Review comment:
       Done.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+    List<String> clause = new ArrayList<>();
+    //NS replacement parameters has no effect when data is also copied to staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+    clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);
+    replica.load(replicatedDbName, primaryDbName, clause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+
+    primary.run("use " + primaryDbName)
+            .run("insert into table1 values (3)")
+            .dump(primaryDbName, clause);
+    replica.load(replicatedDbName, primaryDbName, clause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[]{"1", "2", "3"});
+
+    clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create external table ext_table1 (id int)")
+            .run("insert into ext_table1 values (3)")
+            .run("insert into ext_table1 values (4)")
+            .run("create external table  ext_table2 (key int, value int) partitioned by (load_time timestamp)")
+            .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.123') values(1,2)")
+            .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.124') values(1,3)")
+            .run("show partitions ext_table2")
+            .verifyResults(new String[]{
+                    "load_time=2012-02-21 07%3A08%3A09.123",
+                    "load_time=2012-02-21 07%3A08%3A09.124"})
+            .dump(primaryDbName, clause);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] ArkoSharma commented on a change in pull request #1936: HIVE-24718: Moving to file based iteration for copying data.

Posted by GitBox <gi...@apache.org>.
ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r594106033



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa
   /*
    * Method used from TestReplicationScenariosExclusiveReplica
    */
-  private void assertExternalFileInfo(List<String> expected, String dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List<String> expected, String dumplocation,
                                       WarehouseInstance warehouseInstance)
           throws IOException {
     Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-    Path externalTableInfoFile;
-    if (isIncremental) {
-      externalTableInfoFile = new Path(hivePath, FILE_NAME);
-    } else {
-      externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-    }
-    ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile);
+    Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org