You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/03/01 08:17:00 UTC

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

     [ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559254&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559254 ]

ASF GitHub Bot logged work on HIVE-24718:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Mar/21 08:16
            Start Date: 01/Mar/21 08:16
    Worklog Time Spent: 10m 
      Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r583233006



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+    List<String> clause = new ArrayList<>();
+    //NS replacement parameters has no effect when data is also copied to staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+    clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);

Review comment:
       At this point the entries are written to the _file_list and _file_list_external. 
   Please add assertion on file content here.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+    List<String> clause = new ArrayList<>();
+    //NS replacement parameters has no effect when data is also copied to staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+    clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);
+    replica.load(replicatedDbName, primaryDbName, clause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+
+    primary.run("use " + primaryDbName)
+            .run("insert into table1 values (3)")
+            .dump(primaryDbName, clause);
+    replica.load(replicatedDbName, primaryDbName, clause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[]{"1", "2", "3"});
+
+    clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create external table ext_table1 (id int)")
+            .run("insert into ext_table1 values (3)")
+            .run("insert into ext_table1 values (4)")
+            .run("create external table  ext_table2 (key int, value int) partitioned by (load_time timestamp)")
+            .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.123') values(1,2)")
+            .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.124') values(1,3)")
+            .run("show partitions ext_table2")
+            .verifyResults(new String[]{
+                    "load_time=2012-02-21 07%3A08%3A09.123",
+                    "load_time=2012-02-21 07%3A08%3A09.124"})
+            .dump(primaryDbName, clause);

Review comment:
       At this point the entries are written to the _file_list and _file_list_external.
   Please add assertion on file content here.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+      writeWithRetry(entry);
     } else {
-      thresholdHit = true;
+      writeEntry(entry);
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+    //retry only during creating the file, no retry during writes
+    if (backingFileWriter == null) {
+      try {
+        Retryable retryable = buildRetryable();
+        retryable.executeCallable((Callable<Void>) () -> {
+          if(this.abortOperation) {
+            return null;
+          }
+          backingFileWriter = getWriterCreateMode();
+          return null;
+        });
+      } catch (Exception e) {
+        this.abortOperation = true;
+        throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+      }
+    }
+    if(this.abortOperation) {
+      return;
+    }
+    try {
+      backingFileWriter.writeBytes(getEntryWithNewline(entry));
+      LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+    } catch (IOException e) {
+      this.abortOperation = true;
+      LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e);
+      throw e;
+    }
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  private synchronized void writeWithRetry(String entry) throws IOException {
+    Retryable retryable = buildRetryable();

Review comment:
       Add test for retry case

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+      writeWithRetry(entry);
     } else {
-      thresholdHit = true;
+      writeEntry(entry);
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+    //retry only during creating the file, no retry during writes
+    if (backingFileWriter == null) {
+      try {
+        Retryable retryable = buildRetryable();
+        retryable.executeCallable((Callable<Void>) () -> {
+          if(this.abortOperation) {
+            return null;
+          }
+          backingFileWriter = getWriterCreateMode();
+          return null;
+        });
+      } catch (Exception e) {
+        this.abortOperation = true;
+        throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+      }
+    }
+    if(this.abortOperation) {
+      return;
+    }
+    try {
+      backingFileWriter.writeBytes(getEntryWithNewline(entry));
+      LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+    } catch (IOException e) {
+      this.abortOperation = true;
+      LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e);
+      throw e;
+    }
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+  private synchronized void writeWithRetry(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) () -> {
+        if (this.abortOperation) {
+          return null;
+        }
+        try {
+          if (backingFileWriter == null) {
+            backingFileWriter = initWriter();
+          }
+          backingFileWriter.writeBytes(getEntryWithNewline(entry));
+          backingFileWriter.hflush();
+          LOG.info("Writing entry {} to file list backed by {}", entry, backingFile);
+        } catch (IOException e) {
+          LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e);
+          this.retryMode = true;
+          close();
+          throw e;
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      this.abortOperation = true;
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
     }
+  }
+
+  Retryable buildRetryable() {
+    return Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
+  }
+
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+    return new StringWriter()
+            .append(entry)
+            .append(System.lineSeparator())
+            .toString();
+  }
+
+  FSDataOutputStream initWriter() throws IOException {
+    if(shouldAppend()) {
+      return getWriterAppendMode(); // append in retry-mode if file has been created already
+    }
+    else {
+      return getWriterCreateMode();
+    }
+  }
+
+  private boolean shouldAppend() throws IOException {
+    return backingFile.getFileSystem(conf).exists(backingFile) && this.retryMode;
+  }
+
+  FSDataOutputStream getWriterCreateMode() throws IOException {
     try {
-      cache.put(entry);
-    } catch (InterruptedException e) {
-      throw new SemanticException(e);
+      return backingFile.getFileSystem(conf).create(backingFile);
+    } catch (IOException e) {
+      LOG.error("Error opening {} in append mode", backingFile, e);
+      throw e;
     }
-    if (!thresholdHit && cache.size() >= thresholdPoint) {
-      initStoreToFile(cache.size());
+  }
+
+  FSDataOutputStream getWriterAppendMode() throws IOException {
+    try {
+      return backingFile.getFileSystem(conf).append(backingFile);
+    } catch (IOException e) {
+      LOG.error("Error creating file {}", backingFile, e);
+      throw e;
     }
   }
 
   @Override
   public boolean hasNext() {
-    if (!thresholdHit) {
-      return (cache != null && !cache.isEmpty());
-    }
-    if (nextElement != null) {
+    /*
+    We assume that every add operation either adds an entry completely or doesn't add at all.
+    If this assumption changes then in the following check we should check for incomplete entries.
+    We remove duplicate entries assuming they are only written consecutively.
+    */
+    if (nextElement != null && !nextElement.equals(lastReadElement)) {
       return true;
+    } else {
+      try {
+        do {
+          nextElement = readNextLine();
+          if(nextElement != null && !nextElement.equals(lastReadElement)) {
+            return true;
+          }
+        } while (nextElement != null);
+        return false;
+      } catch (IOException e) {
+        nextElement = null;
+        lastReadElement = null;
+        backingFileReader = null;
+        throw new UncheckedIOException(e);
+      }
     }
-    if (noMoreElement) {
-      return false;
-    }
-    nextElement = readNextLine();
-    if (nextElement == null) {
-      noMoreElement = true;
-    }
-    return !noMoreElement;
   }
 
   @Override
   public String next() {
-    if (!hasNext()) {
+    if ((nextElement == null || nextElement.equals(lastReadElement)) && !hasNext()) {
       throw new NoSuchElementException("No more element in the list backed by " + backingFile);
     }
-    String retVal = nextElement;
+    lastReadElement = nextElement;
     nextElement = null;
-    return thresholdHit ? retVal : cache.poll();
-  }
-
-  private synchronized void initStoreToFile(int cacheSize) {
-    if (!thresholdHit) {
-      fileListStreamer.setName(getNextID());
-      fileListStreamer.setDaemon(true);
-      fileListStreamer.start();
-      thresholdHit = true;
-      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
-    }
+    return lastReadElement;
   }
 
-  private String readNextLine() {
-    String nextElement = null;
-    try {
+  private String readNextLine() throws IOException {
+    try{
+      String nextElement;
       if (backingFileReader == null) {
-        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
-        if (fs.exists(backingFile)) {
-          backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
+        FileSystem fs = backingFile.getFileSystem(conf);
+        if (!fs.exists(backingFile)) {
+          return null;
         }
+        backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
       }
-      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
+      nextElement = backingFileReader.readLine();
+      return nextElement;
     } catch (IOException e) {
-      LOG.error("Unable to read list from backing file " + backingFile, e);
+      LOG.error("Exception while reading file {}.", backingFile, e);
+      close();
+      throw e;
     }
-    return nextElement;
   }
 
   @Override
   public void close() throws IOException {
-    if (thresholdHit && fileListStreamer != null) {
-      fileListStreamer.close();
-    }
-    if (backingFileReader != null) {
-      backingFileReader.close();
-    }
-    LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit);
-  }
-
-  private static String getNextID() {
-    if (Integer.MAX_VALUE == fileListStreamerID) {
-      //reset the counter
-      fileListStreamerID = 0;
+    try {
+      if (backingFileReader != null) {
+        backingFileReader.close();
+      }
+      if (backingFileWriter != null) {
+        backingFileWriter.close();
+      }
+      LOG.info("Completed close for File List backed by:{}", backingFile);
+    } finally {
+      if(backingFileReader != null) {

Review comment:
       nit: You can consider removing if blocks . 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {

Review comment:
       Actually it doesn't check the fact that retry isn't happening, no?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -658,35 +657,34 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
         Path dbRootMetadata = new Path(metadataPath, dbName);
         Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName);
         boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
-        try (Writer writer = new Writer(dumpRoot, conf)) {
-          for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
-            try {
-              Table table = hiveDb.getTable(dbName, tableName);
+        ReplExternalTables externalTablesWriter = new ReplExternalTables(conf);
+        for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
+          try {
+            Table table = hiveDb.getTable(dbName, tableName);
 
-              // Dump external table locations if required.
-              if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
-                      && shouldDumpExternalTableLocation()) {
-                writer.dataLocationDump(table, extTableFileList, conf);
-              }
+            // Dump external table locations if required.
+            if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+                    && shouldDumpExternalTableLocation()) {

Review comment:
       nit: can be in the same line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 559254)
    Time Spent: 4h 50m  (was: 4h 40m)

> Moving to file based iteration for copying data
> -----------------------------------------------
>
>                 Key: HIVE-24718
>                 URL: https://issues.apache.org/jira/browse/HIVE-24718
>             Project: Hive
>          Issue Type: Bug
>            Reporter: Arko Sharma
>            Assignee: Arko Sharma
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, HIVE-24718.04.patch, HIVE-24718.05.patch
>
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)