You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2021/08/16 02:21:53 UTC

[hive] branch master updated: HIVE-25414 : Optimise Hive::addWriteNotificationLog: Reduce FS call per notification. (Mahesh Kumar Behera, reviewed by Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a0f0fc8  HIVE-25414 : Optimise Hive::addWriteNotificationLog: Reduce FS call per notification. (Mahesh Kumar Behera, reviewed by  Rajesh Balamohan)
a0f0fc8 is described below

commit a0f0fc82d538d117b222b0f696f33e40d8cc023f
Author: mahesh kumar behera <ma...@apache.org>
AuthorDate: Mon Aug 16 07:51:37 2021 +0530

    HIVE-25414 : Optimise Hive::addWriteNotificationLog: Reduce FS call per notification. (Mahesh Kumar Behera, reviewed by  Rajesh Balamohan)
---
 .../ql/ddl/table/create/CreateTableOperation.java  |  5 +-
 .../org/apache/hadoop/hive/ql/exec/MoveTask.java   |  3 +-
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  | 11 ++--
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   | 66 ++++++++++++----------
 .../hive/streaming/HiveStreamingConnection.java    |  3 +-
 5 files changed, 49 insertions(+), 39 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
index ead495b..ef327f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.ddl.table.create;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.repl.ReplConst;
@@ -153,9 +154,9 @@ public class CreateTableOperation extends DDLOperation<CreateTableDesc> {
       if (!createdTable.isPartitioned() && AcidUtils.isTransactionalTable(createdTable)) {
         org.apache.hadoop.hive.metastore.api.Table tTable = createdTable.getTTable();
         Path tabLocation = new Path(tTable.getSd().getLocation());
-        List<Path> newFilesList;
+        List<FileStatus> newFilesList;
         try {
-          newFilesList = HdfsUtils.listPath(tabLocation.getFileSystem(context.getConf()), tabLocation, null, true);
+          newFilesList = HdfsUtils.listLocatedFileStatus(tabLocation.getFileSystem(context.getConf()), tabLocation, null, true);
         } catch (IOException e) {
           LOG.error("Error listing files", e);
           throw new HiveException(e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 4580f1d..19fcdd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -344,8 +344,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
             FileSystem srcFs = sourcePath.getFileSystem(conf);
             FileStatus[] srcs = srcFs.globStatus(sourcePath);
             if(srcs != null) {
-              List<Path> newFiles = new ArrayList<>();
-              Hive.moveAcidFiles(srcFs, srcs, targetPath, newFiles);
+              Hive.moveAcidFiles(srcFs, srcs, targetPath, null);
             } else {
               LOG.debug("No files found to move from " + sourcePath + " to " + targetPath);
             }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 9790123..7060ecc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2855,7 +2855,7 @@ public final class Utilities {
   public static final class PartitionDetails {
     public Map<String, String> fullSpec;
     public Partition partition;
-    public List<Path> newFiles;
+    public List<FileStatus> newFiles;
     public boolean hasOldPartition = false;
     public AcidUtils.TableSnapshot tableSnapshot;
   }
@@ -2897,7 +2897,7 @@ public final class Utilities {
           Collections.synchronizedMap(new LinkedHashMap<>());
 
       // calculate full path spec for each valid partition path
-      allPartition.entrySet().forEach(partEntry -> {
+      for (Map.Entry<Path, Optional<List<Path>>> partEntry : allPartition.entrySet()) {
         Path partPath = partEntry.getKey();
         Map<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec);
         String staticParts =  Warehouse.makeDynamicPartName(partSpec);
@@ -2911,11 +2911,14 @@ public final class Utilities {
           PartitionDetails details = new PartitionDetails();
           details.fullSpec = fullPartSpec;
           if (partEntry.getValue().isPresent()) {
-            details.newFiles = partEntry.getValue().get();
+            details.newFiles = new ArrayList<>();
+            for (Path filePath : partEntry.getValue().get()) {
+              details.newFiles.add(fs.getFileStatus(filePath));
+            }
           }
           partitionDetailsMap.put(partPath, details);
         }
-      });
+      }
       return partitionDetailsMap;
     } catch (IOException e) {
       throw new HiveException(e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index f531a35..bb6804b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2364,7 +2364,7 @@ public class Hive {
     // If config is set, table is not temporary and partition being inserted exists, capture
     // the list of files added. For not yet existing partitions (insert overwrite to new partition
     // or dynamic partition inserts), the add partition event will capture the list of files added.
-    List<Path> newFiles = null;
+    List<FileStatus> newFiles = null;
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
       newFiles = Collections.synchronizedList(new ArrayList<>());
     }
@@ -2443,7 +2443,7 @@ public class Hive {
                         boolean inheritLocation, boolean isSkewedStoreAsSubdir,
                         boolean isSrcLocal, boolean isAcidIUDoperation, boolean resetStatistics,
                         Long writeId, int stmtId, boolean isInsertOverwrite,
-                        boolean isTxnTable, List<Path> newFiles, boolean isDirectInsert) throws HiveException {
+                        boolean isTxnTable, List<FileStatus> newFiles, boolean isDirectInsert) throws HiveException {
     Path tblDataLocationPath =  tbl.getDataLocation();
     boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
     assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
@@ -2500,18 +2500,11 @@ public class Hive {
         }
         if (newFiles != null) {
           if (!newFiles.isEmpty()) {
-            // We already know the file list from the direct insert manifestfile
-            FileSystem srcFs = loadPath.getFileSystem(conf);
             newFileStatuses = new ArrayList<>();
-            for (Path filePath : newFiles) {
-              newFileStatuses.add(srcFs.getFileStatus(filePath));
-            }
+            newFileStatuses.addAll(newFiles);
           } else {
             newFileStatuses = listFilesCreatedByQuery(loadPath, writeId, stmtId);
-            newFiles.addAll(newFileStatuses
-                .stream()
-                .map(FileStatus::getPath)
-                .collect(Collectors.toList()));
+            newFiles.addAll(newFileStatuses);
           }
         }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
@@ -3022,7 +3015,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
           LOG.info("New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec);
 
           Partition oldPartition = partitionDetails.partition;
-          List<Path> newFiles = null;
+          List<FileStatus> newFiles = null;
           if (partitionDetails.newFiles != null) {
             // If we already know the files from the direct insert manifest, use them
             newFiles = partitionDetails.newFiles;
@@ -3236,7 +3229,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.perfLogBegin("MoveTask", PerfLogger.LOAD_TABLE);
 
-    List<Path> newFiles = null;
+    List<FileStatus> newFiles = null;
     Table tbl = getTable(tableName);
     assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
     boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
@@ -3245,7 +3238,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     boolean isCompactionTable = AcidUtils.isCompactionTable(tbl.getParameters());
 
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
-      newFiles = Collections.synchronizedList(new ArrayList<Path>());
+      newFiles = Collections.synchronizedList(new ArrayList<FileStatus>());
     }
 
     // Note: this assumes both paths are qualified; which they are, currently.
@@ -3262,9 +3255,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
       //new files list is required only for event notification.
       if (newFiles != null) {
-        newFiles.addAll(listFilesCreatedByQuery(loadPath, writeId, stmtId).stream()
-            .map(FileStatus::getPath)
-            .collect(Collectors.toList()));
+        newFiles.addAll(listFilesCreatedByQuery(loadPath, writeId, stmtId));
       }
     } else {
       // Either a non-MM query, or a load into MM table from an external source.
@@ -3569,7 +3560,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
   }
 
   public void addWriteNotificationLog(Table tbl, Map<String, String> partitionSpec,
-                                       List<Path> newFiles, Long writeId,
+                                       List<FileStatus> newFiles, Long writeId,
                                        List<WriteNotificationLogRequest> requestList) throws HiveException {
     if (!conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
       LOG.debug("write notification log is ignored as dml event logging is disabled");
@@ -3606,7 +3597,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
   }
 
   public static void addWriteNotificationLog(HiveConf conf, Table tbl, List<String> partitionVals,
-                                             Long txnId, Long writeId, List<Path> newFiles,
+                                             Long txnId, Long writeId, List<FileStatus> newFiles,
                                              List<WriteNotificationLogRequest> requestList)
           throws IOException, HiveException, TException {
     FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
@@ -3625,7 +3616,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
-  private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles)
+  private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<FileStatus> newFiles)
       throws HiveException {
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
       LOG.debug("Firing dml insert event");
@@ -3662,19 +3653,18 @@ private void constructOneLBLocationMap(FileStatus fSta,
   }
 
 
-  private static void addInsertFileInformation(List<Path> newFiles, FileSystem fileSystem,
+  private static void addInsertFileInformation(List<FileStatus> newFiles, FileSystem fileSystem,
       InsertEventRequestData insertData) throws IOException {
     LinkedList<Path> directories = null;
-    for (Path p : newFiles) {
-      if (!AcidUtils.bucketFileFilter.accept(p) && !AcidUtils.originalBucketFilter.accept(p)
-          && fileSystem.isDirectory(p)) { // Avoid the fs call if it is possible
+    for (FileStatus status : newFiles) {
+      if (status.isDirectory()) {
         if (directories == null) {
           directories = new LinkedList<>();
         }
-        directories.add(p);
+        directories.add(status.getPath());
         continue;
       }
-      addInsertNonDirectoryInformation(p, fileSystem, insertData);
+      addInsertNonDirectoryInformation(status.getPath(), fileSystem, insertData);
     }
     if (directories == null) {
       return;
@@ -4929,7 +4919,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @throws HiveException
    */
   static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs,
-      boolean isSrcLocal, boolean isAcidIUD, boolean isOverwrite, List<Path> newFiles, boolean isBucketed,
+      boolean isSrcLocal, boolean isAcidIUD, boolean isOverwrite, List<FileStatus> newFilesStatus, boolean isBucketed,
       boolean isFullAcidTable, boolean isManaged, boolean isCompactionTable) throws HiveException {
     try {
       // create the destination if it does not exist
@@ -4957,6 +4947,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
       // srcs = new FileStatus[0]; Why is this needed?
     }
 
+    List<Path> newFiles = null;
+    if (newFilesStatus != null) {
+      newFiles = new ArrayList<>();
+    }
+
     // If we're moving files around for an ACID write then the rules and paths are all different.
     // You can blame this on Owen.
     if (isAcidIUD) {
@@ -4968,6 +4963,17 @@ private void constructOneLBLocationMap(FileStatus fSta,
       copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, isOverwrite,
               newFiles, isFullAcidTable && !isBucketed, isManaged, isCompactionTable);
     }
+
+    if (newFilesStatus != null) {
+      for (Path filePath : newFiles) {
+        try {
+          newFilesStatus.add(fs.getFileStatus(filePath));
+        } catch (Exception e) {
+          LOG.error("Failed to get getFileStatus", e);
+          throw new HiveException(e.getMessage());
+        }
+      }
+    }
   }
 
   public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst,
@@ -5130,7 +5136,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    *          If the table is managed.
    */
   private void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
-          boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
+          boolean isSrcLocal, boolean purge, List<FileStatus> newFiles, PathFilter deletePathFilter,
       boolean isNeedRecycle, boolean isManaged, boolean isInsertOverwrite) throws HiveException {
     try {
 
@@ -5176,7 +5182,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
         // Add file paths of the files that will be moved to the destination if the caller needs it
         if (newFiles != null) {
-          newFiles.addAll(HdfsUtils.listPath(destFs, destf, null, true));
+          newFiles.addAll(HdfsUtils.listLocatedFileStatus(destFs, destf, null, true));
         }
       } else {
         final Map<Future<Boolean>, Path> moveFutures = Maps.newLinkedHashMapWithExpectedSize(srcs.length);
@@ -5224,7 +5230,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
           // Add file paths of the files that will be moved to the destination if the caller needs it
           if (null != newFiles) {
-            newFiles.add(moveFuture.getValue());
+            newFiles.add(destFs.getFileStatus(moveFuture.getValue()));
           }
         }
       }
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 721ce73..a61beb5 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StreamCapabilities;
@@ -781,7 +782,7 @@ public class HiveStreamingConnection implements StreamingConnection {
 
         // List the new files added inside the write path (delta directory).
         FileSystem fs = tableObject.getDataLocation().getFileSystem(conf);
-        List<Path> newFiles = HdfsUtils.listPath(fs, writeInfo.getWriteDir(), null, true);
+        List<FileStatus> newFiles = HdfsUtils.listLocatedFileStatus(fs, writeInfo.getWriteDir(), null, true);
 
         // If no files are added by this streaming writes, then no need to log write notification event.
         if (newFiles.isEmpty()) {