You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/07/08 11:48:13 UTC

[GitHub] [hive] pkumarsinha opened a new pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

pkumarsinha opened a new pull request #1225:
URL: https://github.com/apache/hive/pull/1225


   …n. Config option to execute data copy during load.
   
   ## NOTICE
   
   Please create an issue in ASF JIRA before opening a pull request,
   and you need to set the title of the pull request which starts with
   the corresponding JIRA issue number. (e.g. HIVE-XXXXX: Fix a typo in YYY)
   For more details, please see https://cwiki.apache.org/confluence/display/Hive/HowToContribute
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452812525



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private FileListOpMode fileListOpMode;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+  private volatile boolean asyncMode;
+
+
+  /**
+   * To be used only for READ mode;
+   */
+  public FileList(Path backingFile, HiveConf conf) {

Review comment:
       It would be risky to operate on same file both READ and WRITE at the same time and hence modes are there to prevent that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452810700



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;

Review comment:
       thresholdHit is a boolean which once hit is used to take action. thresholdPoint is a value after which thresholdHit is set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452815690



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {

Review comment:
       FileListStreamer is a treated as a specialized Worker and hence extending the Thread. If it would have been treated as a job then Runnable route might have been fine. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454943283



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);
+  }
+
+  /**
+   * Only add operation is safe for concurrent operations.
+   */
+  public void add(String entry) throws SemanticException {
+    if (thresholdHit && !fileListStreamer.isAlive()) {
+      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() >= thresholdPoint) {
+      initStoreToFile(cache.size());
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!thresholdHit) {
+      return (cache != null && !cache.isEmpty());
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {
+      return false;
+    }
+    nextElement = readNextLine();
+    if (nextElement == null) {
+      noMoreElement = true;
+    }
+    return !noMoreElement;
+  }
+
+  @Override
+  public String next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more element in the list backed by " + backingFile);
+    }
+    String retVal = nextElement;
+    nextElement = null;
+    return thresholdHit ? retVal : cache.poll();
+  }
+
+  private synchronized void initStoreToFile(int cacheSize) {
+    if (!thresholdHit) {
+      fileListStreamer.setName(getNextID());
+      fileListStreamer.setDaemon(true);
+      fileListStreamer.start();
+      thresholdHit = true;
+      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
+    }
+  }
+
+  private String readNextLine() {
+    String nextElement = null;
+    try {
+      if (backingFileReader == null) {
+        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+        if (fs.exists(backingFile)) {
+          backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
+        }
+      }
+      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
+    } catch (IOException e) {
+      LOG.error("Unable to read list from backing file " + backingFile, e);
+    }
+    return nextElement;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (thresholdHit && fileListStreamer != null) {
+      fileListStreamer.close();
+    }
+    if (backingFileReader != null) {
+      backingFileReader.close();
+    }
+    LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit);
+  }
+
+  private static String getNextID() {
+    if (Integer.MAX_VALUE == fileListStreamerID) {
+      //reset the counter
+      fileListStreamerID = 0;
+    }
+    fileListStreamerID++;
+    return FILE_LIST_STREAMER_PREFIX  + fileListStreamerID;
+  }
+
+  public int getThreshold(int cacheSize) {
+    boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor);
+  }
+
+  @VisibleForTesting
+  public boolean isStreamingToFile() {

Review comment:
       If you mark it as public it becomes accesible. So its not protected by @VisibleForTesting anymore




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454926767



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
##########
@@ -1559,6 +1645,76 @@ public void testIncrementalLoad() throws IOException {
     verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror);
   }
 
+  @Test
+  public void testIncrementalLoadLazyCopy() throws IOException {

Review comment:
       There are many existing tests with lazy load false. For external table which uses mini hdfs we already have a test for lazy load true. I will add one for other table as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r455136416



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -207,4 +217,20 @@ public ReplicationMetricCollector getMetricCollector() {
   public void setMetricCollector(ReplicationMetricCollector metricCollector) {
     this.metricCollector = metricCollector;
   }
+
+  public ReplicationSpec getReplicationSpec() {
+    return replicationSpec;
+  }
+
+  public void setReplicationSpec(ReplicationSpec replicationSpec) {
+    this.replicationSpec = replicationSpec;
+  }
+
+  public FileList getFileList(Path backingFile, int cacheSize, HiveConf conf, boolean b) throws IOException {

Review comment:
       removed the change altogether




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] closed pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #1225:
URL: https://github.com/apache/hive/pull/1225


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452813518



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean stop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private volatile boolean asyncMode = false;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+    init();
+  }
+
+  private void init() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode)));
+    LOG.info("Initialized a file based store to save a list at: {}, ayncMode:{}", backingFile, asyncMode);
+  }
+
+  public boolean isValid() {
+    return valid;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!asyncMode) {
+      closeBackingFile();
+      return;
+    }
+    stop = true;
+    synchronized (COMPLETION_LOCK) {
+      while (!completed && isValid()) {
+        try {
+          COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS));
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+    if (!isValid()) {
+      throw new IOException("File list is not in a valid state:" + backingFile);
+    }
+    LOG.info("Completed close for File List backed by ", backingFile);
+  }
+
+  public synchronized void writeInThread(String nextEntry) throws SemanticException {
+    try {
+      backingFileWriter.write(nextEntry);
+      backingFileWriter.newLine();
+    } catch (IOException e) {
+      throw new SemanticException(e);
+    }
+  }
+  @Override
+  public void run() {
+    asyncMode = true;
+    boolean exThrown = false;
+    while (!exThrown && (!stop || !cache.isEmpty())) {
+      try {
+        String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+        if (nextEntry != null) {
+          backingFileWriter.write(nextEntry);
+          backingFileWriter.newLine();
+          LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile);
+        }
+      } catch (Exception iEx) {
+        if (!(iEx instanceof InterruptedException)) {
+          // not draining any more. Inform the producer to avoid OOM.
+          valid = false;
+          LOG.error("Exception while saving the list to file " + backingFile, iEx);
+          exThrown = true;
+        }
+      }
+    }
+    try{
+      closeBackingFile();
+      completed = true;
+    } finally {
+      synchronized (COMPLETION_LOCK) {
+        COMPLETION_LOCK.notify();
+      }
+    }
+    LOG.info("Completed the file list streamer backed by: {}", backingFile);
+  }
+
+  private void closeBackingFile() {
+    try {
+      backingFileWriter.close();
+      LOG.debug("Closed the file list backing file: {}", backingFile);
+    } catch (IOException e) {
+      LOG.error("Exception while closing the file list backing file", e);
+      valid = false;

Review comment:
       No, both are in different threads. the producer will never come to know the state of consumer thread. valid sort of communicates that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454921294



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -207,4 +217,20 @@ public ReplicationMetricCollector getMetricCollector() {
   public void setMetricCollector(ReplicationMetricCollector metricCollector) {
     this.metricCollector = metricCollector;
   }
+
+  public ReplicationSpec getReplicationSpec() {
+    return replicationSpec;
+  }
+
+  public void setReplicationSpec(ReplicationSpec replicationSpec) {
+    this.replicationSpec = replicationSpec;
+  }
+
+  public FileList getFileList(Path backingFile, int cacheSize, HiveConf conf, boolean b) throws IOException {

Review comment:
       This is required for some old test which isn't  mockito based .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454907220



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +178,83 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+   void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    Retry<Void> retryable = new Retry<Void>(IOException.class) {
+      @Override
+      public Void execute() throws Exception {
+        try (BufferedWriter writer = writer()) {
+          for (Path dataPath : dataPathList) {
+            writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+          }
+        } catch (IOException e) {
+          if (e instanceof FileNotFoundException) {

Review comment:
       no, it won't, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452808706



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {
+      // This is only called for replication that handles MM tables; no need for mmCtx.
+      try (BufferedWriter writer = writer()) {
+        for (Path dataPath : dataPathList) {
+          writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+        }
+        done = true;
+      } catch (IOException e) {
+        if (e instanceof FileNotFoundException) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
+        }
+        repeat++;
+        logger.info("writeFilesList failed", e);
+        if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+        }
+
+        int sleepTime = FileUtils.getSleepTime(repeat - 1);
+        logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat);
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException timerEx) {
+          logger.info("thread sleep interrupted", timerEx.getMessage());
+        }
+
+        // in case of io error, reset the file system object
+        FileSystem.closeAllForUGI(Utils.getUGI());
+        dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
+        exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
+        Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+        if (exportFileSystem.exists(exportPath)) {
+          exportFileSystem.delete(exportPath, true);
+        }
+      }
+    }
+  }
+
+  private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
+          throws IOException {
+    ReplChangeManager replChangeManager = ReplChangeManager.getInstance();
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        // Write files inside the sub-directory.
+        Path subDir = fileStatus.getPath();
+        writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir));
+      } else {
+        writer.write(encodedUri(replChangeManager, fileStatus, encodedSubDirs));
+        writer.newLine();
+      }
+    }
+  }
+
+  private BufferedWriter writer() throws IOException {
+    Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+    logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile);
+    return new BufferedWriter(
+            new OutputStreamWriter(exportFileSystem.create(exportToFile))
+    );
+  }
+
+  private String encodedSubDir(String encodedParentDirs, Path subDir) {
+    if (null == encodedParentDirs) {
+      return subDir.getName();
+    } else {
+      return encodedParentDirs + Path.SEPARATOR + subDir.getName();
+    }
+  }
+
+  private String encodedUri(ReplChangeManager replChangeManager, FileStatus fileStatus, String encodedSubDir)
+          throws IOException {
+    Path currentDataFilePath = fileStatus.getPath();
+    String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);

Review comment:
       Which method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454923611



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +178,83 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+   void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    Retry<Void> retryable = new Retry<Void>(IOException.class) {
+      @Override
+      public Void execute() throws Exception {
+        try (BufferedWriter writer = writer()) {
+          for (Path dataPath : dataPathList) {
+            writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+          }
+        } catch (IOException e) {
+          if (e instanceof FileNotFoundException) {

Review comment:
       Shouldn't this suffice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454940146



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);
+  }
+
+  /**
+   * Only add operation is safe for concurrent operations.
+   */
+  public void add(String entry) throws SemanticException {
+    if (thresholdHit && !fileListStreamer.isAlive()) {
+      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() >= thresholdPoint) {
+      initStoreToFile(cache.size());
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!thresholdHit) {
+      return (cache != null && !cache.isEmpty());
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {
+      return false;
+    }
+    nextElement = readNextLine();
+    if (nextElement == null) {
+      noMoreElement = true;
+    }
+    return !noMoreElement;
+  }
+
+  @Override
+  public String next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more element in the list backed by " + backingFile);
+    }
+    String retVal = nextElement;
+    nextElement = null;
+    return thresholdHit ? retVal : cache.poll();
+  }
+
+  private synchronized void initStoreToFile(int cacheSize) {
+    if (!thresholdHit) {
+      fileListStreamer.setName(getNextID());
+      fileListStreamer.setDaemon(true);
+      fileListStreamer.start();
+      thresholdHit = true;
+      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
+    }
+  }
+
+  private String readNextLine() {
+    String nextElement = null;
+    try {
+      if (backingFileReader == null) {
+        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+        if (fs.exists(backingFile)) {
+          backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
+        }
+      }
+      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
+    } catch (IOException e) {
+      LOG.error("Unable to read list from backing file " + backingFile, e);
+    }
+    return nextElement;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (thresholdHit && fileListStreamer != null) {
+      fileListStreamer.close();
+    }
+    if (backingFileReader != null) {
+      backingFileReader.close();
+    }
+    LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit);
+  }
+
+  private static String getNextID() {
+    if (Integer.MAX_VALUE == fileListStreamerID) {
+      //reset the counter
+      fileListStreamerID = 0;
+    }
+    fileListStreamerID++;
+    return FILE_LIST_STREAMER_PREFIX  + fileListStreamerID;
+  }
+
+  public int getThreshold(int cacheSize) {
+    boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor);
+  }
+
+  @VisibleForTesting
+  public boolean isStreamingToFile() {

Review comment:
       It was protected by @VisibleForTesting. But I will remove public also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454929778



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);
+  }
+
+  /**
+   * Only add operation is safe for concurrent operations.
+   */
+  public void add(String entry) throws SemanticException {
+    if (thresholdHit && !fileListStreamer.isAlive()) {
+      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() >= thresholdPoint) {
+      initStoreToFile(cache.size());
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!thresholdHit) {
+      return (cache != null && !cache.isEmpty());
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {
+      return false;
+    }
+    nextElement = readNextLine();
+    if (nextElement == null) {
+      noMoreElement = true;
+    }
+    return !noMoreElement;
+  }
+
+  @Override
+  public String next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more element in the list backed by " + backingFile);
+    }
+    String retVal = nextElement;
+    nextElement = null;
+    return thresholdHit ? retVal : cache.poll();
+  }
+
+  private synchronized void initStoreToFile(int cacheSize) {
+    if (!thresholdHit) {
+      fileListStreamer.setName(getNextID());
+      fileListStreamer.setDaemon(true);
+      fileListStreamer.start();
+      thresholdHit = true;
+      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
+    }
+  }
+
+  private String readNextLine() {
+    String nextElement = null;
+    try {
+      if (backingFileReader == null) {
+        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+        if (fs.exists(backingFile)) {
+          backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
+        }
+      }
+      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
+    } catch (IOException e) {
+      LOG.error("Unable to read list from backing file " + backingFile, e);
+    }
+    return nextElement;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (thresholdHit && fileListStreamer != null) {
+      fileListStreamer.close();
+    }
+    if (backingFileReader != null) {
+      backingFileReader.close();
+    }
+    LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit);
+  }
+
+  private static String getNextID() {
+    if (Integer.MAX_VALUE == fileListStreamerID) {
+      //reset the counter
+      fileListStreamerID = 0;
+    }
+    fileListStreamerID++;
+    return FILE_LIST_STREAMER_PREFIX  + fileListStreamerID;
+  }
+
+  public int getThreshold(int cacheSize) {
+    boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor);
+  }
+
+  @VisibleForTesting
+  public boolean isStreamingToFile() {

Review comment:
       Did you mean package-private?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454938290



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);
+  }
+
+  /**
+   * Only add operation is safe for concurrent operations.
+   */
+  public void add(String entry) throws SemanticException {
+    if (thresholdHit && !fileListStreamer.isAlive()) {
+      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() >= thresholdPoint) {
+      initStoreToFile(cache.size());
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!thresholdHit) {
+      return (cache != null && !cache.isEmpty());
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {
+      return false;
+    }
+    nextElement = readNextLine();
+    if (nextElement == null) {
+      noMoreElement = true;
+    }
+    return !noMoreElement;
+  }
+
+  @Override
+  public String next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more element in the list backed by " + backingFile);
+    }
+    String retVal = nextElement;
+    nextElement = null;
+    return thresholdHit ? retVal : cache.poll();
+  }
+
+  private synchronized void initStoreToFile(int cacheSize) {
+    if (!thresholdHit) {
+      fileListStreamer.setName(getNextID());
+      fileListStreamer.setDaemon(true);
+      fileListStreamer.start();
+      thresholdHit = true;
+      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
+    }
+  }
+
+  private String readNextLine() {
+    String nextElement = null;
+    try {
+      if (backingFileReader == null) {
+        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+        if (fs.exists(backingFile)) {
+          backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
+        }
+      }
+      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
+    } catch (IOException e) {
+      LOG.error("Unable to read list from backing file " + backingFile, e);
+    }
+    return nextElement;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (thresholdHit && fileListStreamer != null) {
+      fileListStreamer.close();
+    }
+    if (backingFileReader != null) {
+      backingFileReader.close();
+    }
+    LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit);
+  }
+
+  private static String getNextID() {
+    if (Integer.MAX_VALUE == fileListStreamerID) {
+      //reset the counter
+      fileListStreamerID = 0;
+    }
+    fileListStreamerID++;
+    return FILE_LIST_STREAMER_PREFIX  + fileListStreamerID;
+  }
+
+  public int getThreshold(int cacheSize) {
+    boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor);
+  }
+
+  @VisibleForTesting
+  public boolean isStreamingToFile() {

Review comment:
       Do you need public methods? For tests a package level visibility should be fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452798321



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -591,14 +590,25 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
         }
       }
       dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
-      extTableCopyWorks = dirLocationsToCopy(extTableLocations);
     }
-    work.setDirCopyIterator(extTableCopyWorks.iterator());
-    work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
+    setDataCopyIterators(extTableFileList, managedTblList);
     work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId);
     return lastReplId;
   }
 
+  private void setDataCopyIterators(FileList extTableFileList, FileList managedTableFileList) throws IOException {
+    boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    extTableFileList.close();

Review comment:
       Is this serving the purpose of flush? Its not clear why close is called before setting the iterator. Needs to be simplified.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {

Review comment:
       Add UTs

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean stop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private volatile boolean asyncMode = false;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+    init();
+  }
+
+  private void init() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode)));
+    LOG.info("Initialized a file based store to save a list at: {}, ayncMode:{}", backingFile, asyncMode);
+  }
+
+  public boolean isValid() {
+    return valid;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!asyncMode) {
+      closeBackingFile();
+      return;
+    }
+    stop = true;
+    synchronized (COMPLETION_LOCK) {
+      while (!completed && isValid()) {
+        try {
+          COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS));
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+    if (!isValid()) {

Review comment:
       When will this be false? Only in closeBackingFile or anywhere else also? If yes then this check can be moved up or let closeBackingFile handle throwing the exception. 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {

Review comment:
       Just implementing Runnable interface is fine. No need to extend Thread

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private FileListOpMode fileListOpMode;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+  private volatile boolean asyncMode;
+
+
+  /**
+   * To be used only for READ mode;
+   */
+  public FileList(Path backingFile, HiveConf conf) {
+    this.backingFile = backingFile;
+    thresholdHit = true;
+    fileListOpMode = FileListOpMode.READ;
+    this.conf = conf;
+  }
+
+  /**
+   * To be used only for WRITE mode;
+   */
+  public FileList(Path backingFile, int cacheSize, HiveConf conf, boolean asyncMode) throws IOException {
+    this.cache = new LinkedBlockingQueue<>(cacheSize);
+    this.backingFile = backingFile;
+    fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+    fileListOpMode = FileListOpMode.WRITE;
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);
+    this.asyncMode = asyncMode;
+  }
+
+  /**
+   * Only add operation is safe for concurrent operation.
+   */
+  public void add(String entry) throws SemanticException {
+    validateMode(FileListOpMode.WRITE);
+    if (!asyncMode) {
+      fileListStreamer.writeInThread(entry);
+      return;
+    }
+    if (thresholdHit && !fileListStreamer.isValid()) {
+      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() > thresholdPoint) {
+      initStoreToFile();
+    }
+  }
+
+  /**
+   * Must be called before the list object can be used for read operation.
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (fileListOpMode == FileListOpMode.READ) {
+      if (backingFileReader != null) {
+        backingFileReader.close();
+      }
+    } else {
+      fileListOpMode = FileListOpMode.CLOSING;
+      if (thresholdHit) {
+        fileListStreamer.close();
+      }
+      fileListOpMode = FileListOpMode.READ;
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    validateMode(FileListOpMode.READ);
+    if (!thresholdHit) {
+      return !cache.isEmpty();
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {
+      return false;
+    }
+    nextElement = readNextLine();
+    if (nextElement == null) {
+      noMoreElement = true;
+    }
+    return !noMoreElement;
+  }
+
+  private String readNextLine() {
+    String nextElement = null;
+    try {
+      if (backingFileReader == null) {
+        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+        if (fs.exists(backingFile)) {
+          backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
+        }
+      }
+      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
+    } catch (IOException e) {
+      LOG.error("Unable to read list from backing file " + backingFile, e);
+    }
+    return nextElement;
+  }
+
+  @Override
+  public String next() {
+    validateMode(FileListOpMode.READ);
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more element in the list backed by " + backingFile);
+    }
+    String retVal = nextElement;
+    nextElement = null;
+    return thresholdHit ? retVal : cache.poll();
+  }
+  private synchronized void initStoreToFile() {
+    if (!thresholdHit) {
+      fileListStreamer.setName(getNextID());
+      fileListStreamer.setDaemon(true);
+      fileListStreamer.start();
+      thresholdHit = true;
+      LOG.info("Started streaming the list elements to file: {}", backingFile);
+    }
+  }
+
+  private static String getNextID() {
+    if (Integer.MAX_VALUE == fileListStreamerID) {
+      //reset the counter
+      fileListStreamerID = 0;
+    }
+    fileListStreamerID++;
+    return FILE_LIST_STREAMER_PREFIX  + fileListStreamerID;
+  }
+
+  private void validateMode(FileListOpMode expectedMode) throws IllegalStateException {
+    if (!fileListOpMode.equals(expectedMode)) {
+      String logMessage = String.format("Invalid mode for File List, expected:%s, found:%s",
+              expectedMode, fileListOpMode);
+      throw new IllegalStateException(logMessage);
+    }
+  }
+
+  public int getThreshold(int cacheSize) {
+    boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor);
+  }
+
+  private enum FileListOpMode {
+    READ,
+    WRITE,
+    CLOSING

Review comment:
       Rename to CLOSE or make it READING, WRITING for consistency

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private FileListOpMode fileListOpMode;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+  private volatile boolean asyncMode;
+
+
+  /**
+   * To be used only for READ mode;
+   */
+  public FileList(Path backingFile, HiveConf conf) {

Review comment:
       Better to remove this mode and the add and getNext/hasNext interfaces  can take care of it.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean stop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private volatile boolean asyncMode = false;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+    init();
+  }
+
+  private void init() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode)));
+    LOG.info("Initialized a file based store to save a list at: {}, ayncMode:{}", backingFile, asyncMode);
+  }
+
+  public boolean isValid() {
+    return valid;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!asyncMode) {
+      closeBackingFile();
+      return;
+    }
+    stop = true;
+    synchronized (COMPLETION_LOCK) {
+      while (!completed && isValid()) {
+        try {
+          COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS));
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+    if (!isValid()) {
+      throw new IOException("File list is not in a valid state:" + backingFile);
+    }
+    LOG.info("Completed close for File List backed by ", backingFile);
+  }
+
+  public synchronized void writeInThread(String nextEntry) throws SemanticException {
+    try {
+      backingFileWriter.write(nextEntry);
+      backingFileWriter.newLine();
+    } catch (IOException e) {
+      throw new SemanticException(e);
+    }
+  }
+  @Override
+  public void run() {
+    asyncMode = true;
+    boolean exThrown = false;
+    while (!exThrown && (!stop || !cache.isEmpty())) {
+      try {
+        String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+        if (nextEntry != null) {
+          backingFileWriter.write(nextEntry);
+          backingFileWriter.newLine();
+          LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile);
+        }
+      } catch (Exception iEx) {
+        if (!(iEx instanceof InterruptedException)) {
+          // not draining any more. Inform the producer to avoid OOM.
+          valid = false;
+          LOG.error("Exception while saving the list to file " + backingFile, iEx);
+          exThrown = true;
+        }
+      }
+    }
+    try{
+      closeBackingFile();
+      completed = true;
+    } finally {
+      synchronized (COMPLETION_LOCK) {
+        COMPLETION_LOCK.notify();
+      }
+    }
+    LOG.info("Completed the file list streamer backed by: {}", backingFile);
+  }
+
+  private void closeBackingFile() {
+    try {
+      backingFileWriter.close();
+      LOG.debug("Closed the file list backing file: {}", backingFile);
+    } catch (IOException e) {
+      LOG.error("Exception while closing the file list backing file", e);
+      valid = false;

Review comment:
       you can directly throw the exception from here instead of maintaining another state in valid boolean

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -591,14 +590,25 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
         }
       }
       dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
-      extTableCopyWorks = dirLocationsToCopy(extTableLocations);
     }
-    work.setDirCopyIterator(extTableCopyWorks.iterator());
-    work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
+    setDataCopyIterators(extTableFileList, managedTblList);
     work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId);
     return lastReplId;
   }
 
+  private void setDataCopyIterators(FileList extTableFileList, FileList managedTableFileList) throws IOException {
+    boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    extTableFileList.close();

Review comment:
       Or better to close it where you are creating the list using a try-with-resources.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {

Review comment:
       Also add concurrency tests

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private FileListOpMode fileListOpMode;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+  private volatile boolean asyncMode;
+
+
+  /**
+   * To be used only for READ mode;
+   */
+  public FileList(Path backingFile, HiveConf conf) {

Review comment:
       How do you mandate that it will be used only in READ mode

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private FileListOpMode fileListOpMode;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+  private volatile boolean asyncMode;
+
+
+  /**
+   * To be used only for READ mode;
+   */
+  public FileList(Path backingFile, HiveConf conf) {
+    this.backingFile = backingFile;
+    thresholdHit = true;
+    fileListOpMode = FileListOpMode.READ;
+    this.conf = conf;
+  }
+
+  /**
+   * To be used only for WRITE mode;
+   */
+  public FileList(Path backingFile, int cacheSize, HiveConf conf, boolean asyncMode) throws IOException {

Review comment:
       How do you mandate that it will be used only in WRITE mode




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454923124



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +178,83 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+   void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    Retry<Void> retryable = new Retry<Void>(IOException.class) {
+      @Override
+      public Void execute() throws Exception {
+        try (BufferedWriter writer = writer()) {
+          for (Path dataPath : dataPathList) {
+            writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+          }
+        } catch (IOException e) {
+          if (e instanceof FileNotFoundException) {

Review comment:
       if (e instanceof FileNotFoundException) {
               logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
               throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
             }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454919825



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -210,6 +210,66 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable {
     assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true);
   }
 
+  @Test
+  public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable {
+    List<String> lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+    //creates external tables with partitions
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("insert into table t1 values (2)")
+            .run("create external table t2 (place string) partitioned by (country string)")
+            .run("insert into table t2 partition(country='india') values ('bangalore')")
+            .run("insert into table t2 partition(country='us') values ('austin')")
+            .run("insert into table t2 partition(country='france') values ('paris')")
+            .dump(primaryDbName, lazyCopyClause);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false);
+
+
+
+    replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("select country from t2 where country = 'us'")
+            .verifyResult("us")
+            .run("select country from t2 where country = 'france'")
+            .verifyResult("france")
+            .run("show partitions t2").verifyResults(new String[] {"country=france", "country=india", "country=us"});
+
+    String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+    // Ckpt should be set on bootstrapped db.
+    replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation);
+
+    assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1");
+    assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("create external table t3 (id int)")
+            .run("insert into table t3 values (10)")
+            .run("create external table t4 as select id from t3")

Review comment:
       How is that related to this patch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454928086



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static BufferedWriter backingFileWriterInTest;
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean signalTostop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+  private volatile boolean initialized = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+  }
+
+  private void lazyInit() throws IOException {
+    if (backingFileWriterInTest == null) {
+      FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+      backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile)));
+    } else {
+      backingFileWriter = backingFileWriterInTest;
+    }
+    initialized = true;
+    LOG.info("Initialized a file based store to save a list at: {}", backingFile);
+  }
+
+  public boolean isValid() {
+    return valid;
+  }
+
+  // Blocks for remaining entries to be flushed to file.
+  @Override
+  public void close() throws IOException {
+    signalTostop = true;
+    synchronized (COMPLETION_LOCK) {
+      while (motiveToWait()) {
+        try {
+          COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS));
+        } catch (InterruptedException e) {
+          // no-op
+        }
+      }
+    }
+    if (!isValid()) {
+      throw new IOException("File list is not in a valid state:" + backingFile);
+    }
+  }
+
+  private boolean motiveToWait() {
+    return !completed && valid;
+  }
+
+  @Override
+  public void run() {
+    try {
+      lazyInit();
+    } catch (IOException e) {
+      valid = false;
+      throw new RuntimeException("Unable to initialize the file list streamer", e);
+    }
+    boolean exThrown = false;
+    while (!exThrown && (!signalTostop || !cache.isEmpty())) {
+      try {
+        String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+        if (nextEntry != null) {
+          backingFileWriter.write(nextEntry);
+          backingFileWriter.newLine();
+          LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile);
+        }
+      } catch (Exception iEx) {
+        if (!(iEx instanceof InterruptedException)) {
+          // not draining any more. Inform the producer to avoid OOM.
+          valid = false;
+          LOG.error("Exception while saving the list to file " + backingFile, iEx);
+          exThrown = true;
+        }
+      }
+    }
+    try{
+      closeBackingFile();
+      completed = true;
+    } finally {
+      synchronized (COMPLETION_LOCK) {
+        COMPLETION_LOCK.notify();
+      }
+    }
+    LOG.info("Completed the file list streamer backed by: {}", backingFile);
+  }
+
+  private void closeBackingFile() {
+    try {
+      backingFileWriter.close();
+      LOG.debug("Closed the file list backing file: {}", backingFile);
+    } catch (IOException e) {
+      LOG.error("Exception while closing the file list backing file", e);
+      valid = false;
+    }
+  }
+
+  @VisibleForTesting
+  public static void setBackingFileWriterInTest(BufferedWriter bufferedWriter) {

Review comment:
       How will thread work in mocked case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454907711



##########
File path: ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerFactory.class})
+public class TestFileList {
+
+  @Mock
+  private HiveConf hiveConf;
+
+  @Mock
+  BufferedWriter bufferedWriter;
+
+  @Test
+  public void testNoStreaming() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    assertFalse(fileList.isStreamingToFile());
+  }
+
+  @Test
+  public void testAlwaysStreaming() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(true);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry1");
+    waitForStreamingInitialization(fileList);
+    assertTrue(fileList.isStreamingToFile());
+    fileList.close();
+    waitForStreamingClosure(fileList);
+  }
+
+  @Test
+  public void testStreaminOnCacheHit() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 5, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    fileList.add("Entry3");
+    Thread.sleep(5000L);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry4");
+    fileList.add("Entry5");
+    waitForStreamingInitialization(fileList);
+    fileList.close();
+    waitForStreamingClosure(fileList);

Review comment:
       We already have ArgumentCaptor so it should be written. And there are other tests backing the file write being successful.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] commented on pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #1225:
URL: https://github.com/apache/hive/pull/1225#issuecomment-695140029


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
   Feel free to reach out on the dev@hive.apache.org list if the patch is in need of reviews.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454911634



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
##########
@@ -1559,6 +1645,76 @@ public void testIncrementalLoad() throws IOException {
     verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror);
   }
 
+  @Test
+  public void testIncrementalLoadLazyCopy() throws IOException {

Review comment:
       You can add this test in the mini hdfs set up to make sure data copy is happening correctly in case of lazy load false and true.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452811664



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -591,14 +590,25 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
         }
       }
       dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
-      extTableCopyWorks = dirLocationsToCopy(extTableLocations);
     }
-    work.setDirCopyIterator(extTableCopyWorks.iterator());
-    work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
+    setDataCopyIterators(extTableFileList, managedTblList);
     work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId);
     return lastReplId;
   }
 
+  private void setDataCopyIterators(FileList extTableFileList, FileList managedTableFileList) throws IOException {
+    boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    extTableFileList.close();

Review comment:
       Close make sure that every thing is flushed out and the list can be used in READ mode




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452812877



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {

Review comment:
       Also add concurrency tests - Can you please suggest on this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454912593



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +178,83 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+   void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    Retry<Void> retryable = new Retry<Void>(IOException.class) {
+      @Override
+      public Void execute() throws Exception {
+        try (BufferedWriter writer = writer()) {
+          for (Path dataPath : dataPathList) {
+            writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+          }
+        } catch (IOException e) {
+          if (e instanceof FileNotFoundException) {

Review comment:
       FileNotFoundException is an IOException and you have added retry for IOException. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452812001



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private FileListOpMode fileListOpMode;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+  private volatile boolean asyncMode;
+
+
+  /**
+   * To be used only for READ mode;
+   */
+  public FileList(Path backingFile, HiveConf conf) {
+    this.backingFile = backingFile;
+    thresholdHit = true;
+    fileListOpMode = FileListOpMode.READ;
+    this.conf = conf;
+  }
+
+  /**
+   * To be used only for WRITE mode;
+   */
+  public FileList(Path backingFile, int cacheSize, HiveConf conf, boolean asyncMode) throws IOException {

Review comment:
       If it is called other wise, it won't allow to use it anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452809466



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {
+      // This is only called for replication that handles MM tables; no need for mmCtx.
+      try (BufferedWriter writer = writer()) {
+        for (Path dataPath : dataPathList) {
+          writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+        }
+        done = true;
+      } catch (IOException e) {
+        if (e instanceof FileNotFoundException) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
+        }
+        repeat++;
+        logger.info("writeFilesList failed", e);
+        if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+        }
+
+        int sleepTime = FileUtils.getSleepTime(repeat - 1);
+        logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat);
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException timerEx) {
+          logger.info("thread sleep interrupted", timerEx.getMessage());
+        }
+
+        // in case of io error, reset the file system object
+        FileSystem.closeAllForUGI(Utils.getUGI());
+        dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
+        exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
+        Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+        if (exportFileSystem.exists(exportPath)) {
+          exportFileSystem.delete(exportPath, true);
+        }
+      }
+    }
+  }
+
+  private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
+          throws IOException {
+    ReplChangeManager replChangeManager = ReplChangeManager.getInstance();

Review comment:
       Can you please elaborate? Didn't get which parameter you are referring to.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452815017



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean stop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private volatile boolean asyncMode = false;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+    init();
+  }
+
+  private void init() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode)));
+    LOG.info("Initialized a file based store to save a list at: {}, ayncMode:{}", backingFile, asyncMode);
+  }
+
+  public boolean isValid() {
+    return valid;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!asyncMode) {
+      closeBackingFile();
+      return;
+    }
+    stop = true;
+    synchronized (COMPLETION_LOCK) {
+      while (!completed && isValid()) {
+        try {
+          COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS));
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+    if (!isValid()) {

Review comment:
       No, it can't be moved above as this ensures the correctness of the consumption of the remaining entries from the cache.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r453587433



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
##########
@@ -76,16 +77,29 @@ public void handle(Context withinContext) throws Exception {
         withinContext.hiveConf);
     Iterable<String> files = eventMessage.getFiles();
 
+    boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+
     /*
       * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple tables.
       * But, Insert event is generated for each partition to which the data is inserted.
       * So, qlPtns list will have only one entry.
      */
     Partition ptn = (null == qlPtns || qlPtns.isEmpty()) ? null : qlPtns.get(0);
     if (files != null) {
-      // encoded filename/checksum of files, write into _files
-      for (String file : files) {
-        writeFileEntry(qlMdTable, ptn, file, withinContext);
+      if (copyAtLoad) {
+        // encoded filename/checksum of files, write into _files
+        Path dataPath = null;
+        if ((null == qlPtns) || qlPtns.isEmpty()) {
+          dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+        } else {
+          dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator

Review comment:
       Will have to create multiple Path object and hence not preferring.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
##########
@@ -51,4 +56,20 @@ public Path getFullyQualifiedSourcePath() {
   public Path getFullyQualifiedTargetPath() {
     return fullyQualifiedTargetPath;
   }
+
+  @Override
+  public String convertToString() {
+    StringBuilder objInStr = new StringBuilder();
+    objInStr.append(fullyQualifiedSourcePath)
+            .append(URI_SEPARATOR)
+            .append(fullyQualifiedTargetPath);
+    return objInStr.toString();
+  }
+
+  @Override
+  public void loadFromString(String objectInStr) {

Review comment:
       no

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
##########
@@ -81,10 +88,13 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met
             withinContext.hiveConf);
 
     if ((null == qlPtns) || qlPtns.isEmpty()) {
-      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext);
+      Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext, dataPath);
     } else {
       for (int idx = 0; idx < qlPtns.size(); idx++) {
-        writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext);
+        Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator

Review comment:
       Will have to create multiple  Path object and hence not preferring.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454918458



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -210,6 +210,66 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable {
     assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true);
   }
 
+  @Test
+  public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable {
+    List<String> lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+    //creates external tables with partitions
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("insert into table t1 values (2)")
+            .run("create external table t2 (place string) partitioned by (country string)")
+            .run("insert into table t2 partition(country='india') values ('bangalore')")
+            .run("insert into table t2 partition(country='us') values ('austin')")
+            .run("insert into table t2 partition(country='france') values ('paris')")
+            .dump(primaryDbName, lazyCopyClause);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false);
+
+
+
+    replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("select country from t2 where country = 'us'")
+            .verifyResult("us")
+            .run("select country from t2 where country = 'france'")
+            .verifyResult("france")
+            .run("show partitions t2").verifyResults(new String[] {"country=france", "country=india", "country=us"});

Review comment:
       Above we have queries to select data, isn't that what  you are looking for?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454909369



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
##########
@@ -1559,6 +1645,76 @@ public void testIncrementalLoad() throws IOException {
     verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror);
   }
 
+  @Test
+  public void testIncrementalLoadLazyCopy() throws IOException {

Review comment:
       no, and I don't think that should be a concern here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452810053



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);

Review comment:
       Cache is rebuilt and in case of file, it should be overwritten




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r453254993



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452816170



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean stop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private volatile boolean asyncMode = false;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+    init();
+  }
+
+  private void init() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode)));

Review comment:
       I will get rid of the synchronous  mode altogether as currently not needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r452816736



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -330,6 +333,20 @@ a database ( directory )
     return 0;
   }
 
+  private void addLazyDataCopyTask(TaskTracker loadTaskTracker) {

Review comment:
       This is only for external tables. This will be before metadata copy as we are doing currently for external tables.

##########
File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
##########
@@ -148,6 +148,13 @@ public static synchronized ReplChangeManager getInstance(Configuration conf)
     return instance;
   }
 
+  public static synchronized ReplChangeManager getInstance() {

Review comment:
       Needed utility method of ReplChangeManager which earlier used to be static.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r451668879



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {

Review comment:
       this check is added to different methods

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
##########
@@ -76,16 +77,29 @@ public void handle(Context withinContext) throws Exception {
         withinContext.hiveConf);
     Iterable<String> files = eventMessage.getFiles();
 
+    boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+
     /*
       * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple tables.
       * But, Insert event is generated for each partition to which the data is inserted.
       * So, qlPtns list will have only one entry.
      */
     Partition ptn = (null == qlPtns || qlPtns.isEmpty()) ? null : qlPtns.get(0);
     if (files != null) {
-      // encoded filename/checksum of files, write into _files
-      for (String file : files) {
-        writeFileEntry(qlMdTable, ptn, file, withinContext);
+      if (copyAtLoad) {
+        // encoded filename/checksum of files, write into _files
+        Path dataPath = null;
+        if ((null == qlPtns) || qlPtns.isEmpty()) {
+          dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+        } else {
+          dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator

Review comment:
       Use path constructor instead of appending with File separator

##########
File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
##########
@@ -148,6 +148,13 @@ public static synchronized ReplChangeManager getInstance(Configuration conf)
     return instance;
   }
 
+  public static synchronized ReplChangeManager getInstance() {

Review comment:
       why do you need this?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {

Review comment:
       use existing retry interface

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {
+      // This is only called for replication that handles MM tables; no need for mmCtx.
+      try (BufferedWriter writer = writer()) {
+        for (Path dataPath : dataPathList) {
+          writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+        }
+        done = true;
+      } catch (IOException e) {
+        if (e instanceof FileNotFoundException) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
+        }
+        repeat++;
+        logger.info("writeFilesList failed", e);
+        if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+        }
+
+        int sleepTime = FileUtils.getSleepTime(repeat - 1);
+        logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat);
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException timerEx) {
+          logger.info("thread sleep interrupted", timerEx.getMessage());
+        }
+
+        // in case of io error, reset the file system object
+        FileSystem.closeAllForUGI(Utils.getUGI());
+        dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
+        exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
+        Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+        if (exportFileSystem.exists(exportPath)) {
+          exportFileSystem.delete(exportPath, true);
+        }
+      }
+    }
+  }
+
+  private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
+          throws IOException {
+    ReplChangeManager replChangeManager = ReplChangeManager.getInstance();

Review comment:
       Needn't pass this as param. Can be accessed directly by encodeUri method

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);

Review comment:
       what happens to checkpointning in case of the cache? If it fails in between, is the cache rebuilt?

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -524,6 +524,11 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
             + "task increment that would cross the specified limit."),
     REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",100,
         "Number of threads that will be used to dump partition data information during repl dump."),
+    REPL_DATA_COPY_LAZY("hive.repl.data.copy.lazy", false,

Review comment:
       what happens if this config is set at dump time but not at load time. Please add validations

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);
+    boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    FileList managedTblList = dataCopyAtLoad ? null : createTableFileList(dumpRoot, EximUtil.FILES_NAME, cacheSize);
+    FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILES_NAME_EXTERNAL, cacheSize);

Review comment:
       this check is only for managed tables?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);
+    boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    FileList managedTblList = dataCopyAtLoad ? null : createTableFileList(dumpRoot, EximUtil.FILES_NAME, cacheSize);
+    FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILES_NAME_EXTERNAL, cacheSize);

Review comment:
       If the external table base path is on source, where will the distcp job run to copy data there? Will it be on source or target?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -330,6 +333,20 @@ a database ( directory )
     return 0;
   }
 
+  private void addLazyDataCopyTask(TaskTracker loadTaskTracker) {

Review comment:
       will this data copy be after metadata copy or before?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+public interface StringConvertibleObject {

Review comment:
       Do you need this? Can be utility methods

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
##########
@@ -81,10 +88,13 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met
             withinContext.hiveConf);
 
     if ((null == qlPtns) || qlPtns.isEmpty()) {
-      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext);
+      Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext, dataPath);
     } else {
       for (int idx = 0; idx < qlPtns.size(); idx++) {
-        writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext);
+        Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator

Review comment:
       nit : use path constructor instead of doing a append

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
##########
@@ -51,4 +56,20 @@ public Path getFullyQualifiedSourcePath() {
   public Path getFullyQualifiedTargetPath() {
     return fullyQualifiedTargetPath;
   }
+
+  @Override
+  public String convertToString() {
+    StringBuilder objInStr = new StringBuilder();
+    objInStr.append(fullyQualifiedSourcePath)
+            .append(URI_SEPARATOR)
+            .append(fullyQualifiedTargetPath);
+    return objInStr.toString();
+  }
+
+  @Override
+  public void loadFromString(String objectInStr) {

Review comment:
       The cm path is not needed here?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);

Review comment:
       Or after writing to file if it fails, do we rewrite?

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
##########
@@ -131,16 +131,15 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw
       private int tableDumpCount = 0;
 
       @Override
-      List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList,
-                                                    Path dbRootMetadata, Path dbRootData,
-                                               long lastReplId, Hive hiveDb,
-                                               HiveWrapper.Tuple<Table> tuple)
+      void dumpTable(String dbName, String tblName, String validTxnList,
+                     Path dbRootMetadata, Path dbRootData,
+                     long lastReplId, Hive hiveDb,
+                     HiveWrapper.Tuple<Table> tuple, FileList managedTableDirFileList, boolean dataCopyAtLoad)

Review comment:
       unused params. Add tests

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean stop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private volatile boolean asyncMode = false;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+    init();
+  }
+
+  private void init() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode)));

Review comment:
       rename asyncmode to overwrite. init is called at the constructor and Its set to false always there

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+  private void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    boolean done = false;
+    int repeat = 0;
+    while (!done) {
+      // This is only called for replication that handles MM tables; no need for mmCtx.
+      try (BufferedWriter writer = writer()) {
+        for (Path dataPath : dataPathList) {
+          writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+        }
+        done = true;
+      } catch (IOException e) {
+        if (e instanceof FileNotFoundException) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
+        }
+        repeat++;
+        logger.info("writeFilesList failed", e);
+        if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) {
+          logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+        }
+
+        int sleepTime = FileUtils.getSleepTime(repeat - 1);
+        logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat);
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException timerEx) {
+          logger.info("thread sleep interrupted", timerEx.getMessage());
+        }
+
+        // in case of io error, reset the file system object
+        FileSystem.closeAllForUGI(Utils.getUGI());
+        dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
+        exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
+        Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+        if (exportFileSystem.exists(exportPath)) {
+          exportFileSystem.delete(exportPath, true);
+        }
+      }
+    }
+  }
+
+  private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
+          throws IOException {
+    ReplChangeManager replChangeManager = ReplChangeManager.getInstance();
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        // Write files inside the sub-directory.
+        Path subDir = fileStatus.getPath();
+        writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir));
+      } else {
+        writer.write(encodedUri(replChangeManager, fileStatus, encodedSubDirs));
+        writer.newLine();
+      }
+    }
+  }
+
+  private BufferedWriter writer() throws IOException {
+    Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+    logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile);
+    return new BufferedWriter(
+            new OutputStreamWriter(exportFileSystem.create(exportToFile))
+    );
+  }
+
+  private String encodedSubDir(String encodedParentDirs, Path subDir) {
+    if (null == encodedParentDirs) {
+      return subDir.getName();
+    } else {
+      return encodedParentDirs + Path.SEPARATOR + subDir.getName();
+    }
+  }
+
+  private String encodedUri(ReplChangeManager replChangeManager, FileStatus fileStatus, String encodedSubDir)
+          throws IOException {
+    Path currentDataFilePath = fileStatus.getPath();
+    String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);

Review comment:
       Does this method need to be static

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements Closeable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;

Review comment:
       why do you need a thresholdPoint and thresholdHit?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1225: HIVE-23069 Memory efficient iterator should be used during replicatio…

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454888843



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);
+  }
+
+  /**
+   * Only add operation is safe for concurrent operations.
+   */
+  public void add(String entry) throws SemanticException {
+    if (thresholdHit && !fileListStreamer.isAlive()) {
+      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() >= thresholdPoint) {
+      initStoreToFile(cache.size());
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!thresholdHit) {
+      return (cache != null && !cache.isEmpty());
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {

Review comment:
       why do you need noMoreElement and nextElement

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -207,4 +217,20 @@ public ReplicationMetricCollector getMetricCollector() {
   public void setMetricCollector(ReplicationMetricCollector metricCollector) {
     this.metricCollector = metricCollector;
   }
+
+  public ReplicationSpec getReplicationSpec() {
+    return replicationSpec;
+  }
+
+  public void setReplicationSpec(ReplicationSpec replicationSpec) {
+    this.replicationSpec = replicationSpec;
+  }
+
+  public FileList getFileList(Path backingFile, int cacheSize, HiveConf conf, boolean b) throws IOException {

Review comment:
       If this is used for tests, you can just mock the ReplDumpWork and return expected values.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -210,6 +210,66 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable {
     assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true);
   }
 
+  @Test
+  public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable {
+    List<String> lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+    //creates external tables with partitions
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("insert into table t1 values (2)")
+            .run("create external table t2 (place string) partitioned by (country string)")
+            .run("insert into table t2 partition(country='india') values ('bangalore')")
+            .run("insert into table t2 partition(country='us') values ('austin')")
+            .run("insert into table t2 partition(country='france') values ('paris')")
+            .dump(primaryDbName, lazyCopyClause);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false);
+
+
+
+    replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("select country from t2 where country = 'us'")
+            .verifyResult("us")
+            .run("select country from t2 where country = 'france'")
+            .verifyResult("france")
+            .run("show partitions t2").verifyResults(new String[] {"country=france", "country=india", "country=us"});
+
+    String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+    // Ckpt should be set on bootstrapped db.
+    replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation);
+
+    assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1");
+    assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("create external table t3 (id int)")
+            .run("insert into table t3 values (10)")
+            .run("create external table t4 as select id from t3")

Review comment:
       test for external table with a different partition location

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);

Review comment:
       if cache size is 0, this call is not needed

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static BufferedWriter backingFileWriterInTest;
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean signalTostop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+  private volatile boolean initialized = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+  }
+
+  private void lazyInit() throws IOException {
+    if (backingFileWriterInTest == null) {

Review comment:
       mock this method instead of changing the code specific to test

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerFactory.class})
+public class TestFileList {
+
+  @Mock
+  private HiveConf hiveConf;
+
+  @Mock
+  BufferedWriter bufferedWriter;
+
+  @Test
+  public void testNoStreaming() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    assertFalse(fileList.isStreamingToFile());
+  }
+
+  @Test
+  public void testAlwaysStreaming() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(true);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry1");
+    waitForStreamingInitialization(fileList);
+    assertTrue(fileList.isStreamingToFile());
+    fileList.close();
+    waitForStreamingClosure(fileList);
+  }
+
+  @Test
+  public void testStreaminOnCacheHit() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 5, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    fileList.add("Entry3");
+    Thread.sleep(5000L);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry4");
+    fileList.add("Entry5");
+    waitForStreamingInitialization(fileList);
+    fileList.close();
+    waitForStreamingClosure(fileList);

Review comment:
       check if data is written to file

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static BufferedWriter backingFileWriterInTest;
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean signalTostop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+  private volatile boolean initialized = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+  }
+
+  private void lazyInit() throws IOException {
+    if (backingFileWriterInTest == null) {
+      FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+      backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile)));
+    } else {
+      backingFileWriter = backingFileWriterInTest;
+    }
+    initialized = true;
+    LOG.info("Initialized a file based store to save a list at: {}", backingFile);
+  }
+
+  public boolean isValid() {
+    return valid;
+  }
+
+  // Blocks for remaining entries to be flushed to file.
+  @Override
+  public void close() throws IOException {
+    signalTostop = true;
+    synchronized (COMPLETION_LOCK) {
+      while (motiveToWait()) {
+        try {
+          COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS));
+        } catch (InterruptedException e) {
+          // no-op
+        }
+      }
+    }
+    if (!isValid()) {
+      throw new IOException("File list is not in a valid state:" + backingFile);
+    }
+  }
+
+  private boolean motiveToWait() {
+    return !completed && valid;
+  }
+
+  @Override
+  public void run() {
+    try {
+      lazyInit();
+    } catch (IOException e) {
+      valid = false;
+      throw new RuntimeException("Unable to initialize the file list streamer", e);
+    }
+    boolean exThrown = false;
+    while (!exThrown && (!signalTostop || !cache.isEmpty())) {
+      try {
+        String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+        if (nextEntry != null) {
+          backingFileWriter.write(nextEntry);
+          backingFileWriter.newLine();
+          LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile);
+        }
+      } catch (Exception iEx) {
+        if (!(iEx instanceof InterruptedException)) {
+          // not draining any more. Inform the producer to avoid OOM.
+          valid = false;
+          LOG.error("Exception while saving the list to file " + backingFile, iEx);
+          exThrown = true;
+        }
+      }
+    }
+    try{
+      closeBackingFile();
+      completed = true;
+    } finally {
+      synchronized (COMPLETION_LOCK) {
+        COMPLETION_LOCK.notify();
+      }
+    }
+    LOG.info("Completed the file list streamer backed by: {}", backingFile);
+  }
+
+  private void closeBackingFile() {
+    try {
+      backingFileWriter.close();
+      LOG.debug("Closed the file list backing file: {}", backingFile);
+    } catch (IOException e) {
+      LOG.error("Exception while closing the file list backing file", e);
+      valid = false;
+    }
+  }
+
+  @VisibleForTesting
+  public static void setBackingFileWriterInTest(BufferedWriter bufferedWriter) {

Review comment:
       mock this instead of having test specific code

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -210,6 +210,66 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable {
     assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true);
   }
 
+  @Test
+  public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable {
+    List<String> lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+    //creates external tables with partitions
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("insert into table t1 values (2)")
+            .run("create external table t2 (place string) partitioned by (country string)")
+            .run("insert into table t2 partition(country='india') values ('bangalore')")
+            .run("insert into table t2 partition(country='us') values ('austin')")
+            .run("insert into table t2 partition(country='france') values ('paris')")
+            .dump(primaryDbName, lazyCopyClause);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false);
+
+
+
+    replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("select country from t2 where country = 'us'")
+            .verifyResult("us")
+            .run("select country from t2 where country = 'france'")
+            .verifyResult("france")
+            .run("show partitions t2").verifyResults(new String[] {"country=france", "country=india", "country=us"});

Review comment:
       verify data in partitions

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +178,83 @@ private void validateSrcPathListExists() throws IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+   void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    Retry<Void> retryable = new Retry<Void>(IOException.class) {
+      @Override
+      public Void execute() throws Exception {
+        try (BufferedWriter writer = writer()) {
+          for (Path dataPath : dataPathList) {
+            writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+          }
+        } catch (IOException e) {
+          if (e instanceof FileNotFoundException) {

Review comment:
       It will retry for FileNotFoundException also

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
##########
@@ -1559,6 +1645,76 @@ public void testIncrementalLoad() throws IOException {
     verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror);
   }
 
+  @Test
+  public void testIncrementalLoadLazyCopy() throws IOException {

Review comment:
       Is this test in different mini hdfs set up?

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerFactory.class})
+public class TestFileList {
+
+  @Mock
+  private HiveConf hiveConf;
+
+  @Mock
+  BufferedWriter bufferedWriter;
+
+  @Test
+  public void testNoStreaming() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    assertFalse(fileList.isStreamingToFile());
+  }
+
+  @Test
+  public void testAlwaysStreaming() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(true);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry1");
+    waitForStreamingInitialization(fileList);
+    assertTrue(fileList.isStreamingToFile());
+    fileList.close();
+    waitForStreamingClosure(fileList);
+  }
+
+  @Test
+  public void testStreaminOnCacheHit() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 5, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    fileList.add("Entry3");
+    Thread.sleep(5000L);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry4");
+    fileList.add("Entry5");
+    waitForStreamingInitialization(fileList);
+    fileList.close();
+    waitForStreamingClosure(fileList);
+  }
+
+  @Test
+  public void testConcurrentAdd() throws Exception {
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    int numOfEntries = 1000;
+    int numOfThreads = 10;
+    ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
+
+    for (int i=1; i<=numOfEntries; i++) {
+      executorService.submit(() -> {
+        try {
+          fileList.add("someEntry");
+        } catch (SemanticException e) {
+          throw new RuntimeException("Unbale to add to file list.");
+        }
+      });
+    }
+    executorService.awaitTermination(1, TimeUnit.MINUTES);
+    waitForStreamingInitialization(fileList);
+    fileList.close();
+    waitForStreamingClosure(fileList);
+    ArgumentCaptor<String> entryArgs = ArgumentCaptor.forClass(String.class);
+    Mockito.verify(bufferedWriter, Mockito.times(numOfEntries)).write(entryArgs.capture());

Review comment:
       check for data in file

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);
+  }
+
+  /**
+   * Only add operation is safe for concurrent operations.
+   */
+  public void add(String entry) throws SemanticException {
+    if (thresholdHit && !fileListStreamer.isAlive()) {
+      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() >= thresholdPoint) {
+      initStoreToFile(cache.size());
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!thresholdHit) {
+      return (cache != null && !cache.isEmpty());
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {
+      return false;
+    }
+    nextElement = readNextLine();
+    if (nextElement == null) {
+      noMoreElement = true;
+    }
+    return !noMoreElement;
+  }
+
+  @Override
+  public String next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more element in the list backed by " + backingFile);
+    }
+    String retVal = nextElement;
+    nextElement = null;
+    return thresholdHit ? retVal : cache.poll();
+  }
+
+  private synchronized void initStoreToFile(int cacheSize) {
+    if (!thresholdHit) {
+      fileListStreamer.setName(getNextID());
+      fileListStreamer.setDaemon(true);
+      fileListStreamer.start();
+      thresholdHit = true;
+      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
+    }
+  }
+
+  private String readNextLine() {
+    String nextElement = null;
+    try {
+      if (backingFileReader == null) {
+        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+        if (fs.exists(backingFile)) {
+          backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
+        }
+      }
+      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
+    } catch (IOException e) {
+      LOG.error("Unable to read list from backing file " + backingFile, e);
+    }
+    return nextElement;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (thresholdHit && fileListStreamer != null) {
+      fileListStreamer.close();
+    }
+    if (backingFileReader != null) {
+      backingFileReader.close();
+    }
+    LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit);
+  }
+
+  private static String getNextID() {
+    if (Integer.MAX_VALUE == fileListStreamerID) {
+      //reset the counter
+      fileListStreamerID = 0;
+    }
+    fileListStreamerID++;
+    return FILE_LIST_STREAMER_PREFIX  + fileListStreamerID;
+  }
+
+  public int getThreshold(int cacheSize) {
+    boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor);
+  }
+
+  @VisibleForTesting
+  public boolean isStreamingToFile() {

Review comment:
       for unit tests you can make them package public




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org