You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2021/08/16 02:21:53 UTC
[hive] branch master updated: HIVE-25414 : Optimise
Hive::addWriteNotificationLog: Reduce FS call per notification. (Mahesh
Kumar Behera, reviewed by Rajesh Balamohan)
This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a0f0fc8 HIVE-25414 : Optimise Hive::addWriteNotificationLog: Reduce FS call per notification. (Mahesh Kumar Behera, reviewed by Rajesh Balamohan)
a0f0fc8 is described below
commit a0f0fc82d538d117b222b0f696f33e40d8cc023f
Author: mahesh kumar behera <ma...@apache.org>
AuthorDate: Mon Aug 16 07:51:37 2021 +0530
HIVE-25414 : Optimise Hive::addWriteNotificationLog: Reduce FS call per notification. (Mahesh Kumar Behera, reviewed by Rajesh Balamohan)
---
.../ql/ddl/table/create/CreateTableOperation.java | 5 +-
.../org/apache/hadoop/hive/ql/exec/MoveTask.java | 3 +-
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 11 ++--
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 66 ++++++++++++----------
.../hive/streaming/HiveStreamingConnection.java | 3 +-
5 files changed, 49 insertions(+), 39 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
index ead495b..ef327f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.ddl.table.create;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.repl.ReplConst;
@@ -153,9 +154,9 @@ public class CreateTableOperation extends DDLOperation<CreateTableDesc> {
if (!createdTable.isPartitioned() && AcidUtils.isTransactionalTable(createdTable)) {
org.apache.hadoop.hive.metastore.api.Table tTable = createdTable.getTTable();
Path tabLocation = new Path(tTable.getSd().getLocation());
- List<Path> newFilesList;
+ List<FileStatus> newFilesList;
try {
- newFilesList = HdfsUtils.listPath(tabLocation.getFileSystem(context.getConf()), tabLocation, null, true);
+ newFilesList = HdfsUtils.listLocatedFileStatus(tabLocation.getFileSystem(context.getConf()), tabLocation, null, true);
} catch (IOException e) {
LOG.error("Error listing files", e);
throw new HiveException(e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 4580f1d..19fcdd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -344,8 +344,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
FileSystem srcFs = sourcePath.getFileSystem(conf);
FileStatus[] srcs = srcFs.globStatus(sourcePath);
if(srcs != null) {
- List<Path> newFiles = new ArrayList<>();
- Hive.moveAcidFiles(srcFs, srcs, targetPath, newFiles);
+ Hive.moveAcidFiles(srcFs, srcs, targetPath, null);
} else {
LOG.debug("No files found to move from " + sourcePath + " to " + targetPath);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 9790123..7060ecc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2855,7 +2855,7 @@ public final class Utilities {
public static final class PartitionDetails {
public Map<String, String> fullSpec;
public Partition partition;
- public List<Path> newFiles;
+ public List<FileStatus> newFiles;
public boolean hasOldPartition = false;
public AcidUtils.TableSnapshot tableSnapshot;
}
@@ -2897,7 +2897,7 @@ public final class Utilities {
Collections.synchronizedMap(new LinkedHashMap<>());
// calculate full path spec for each valid partition path
- allPartition.entrySet().forEach(partEntry -> {
+ for (Map.Entry<Path, Optional<List<Path>>> partEntry : allPartition.entrySet()) {
Path partPath = partEntry.getKey();
Map<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec);
String staticParts = Warehouse.makeDynamicPartName(partSpec);
@@ -2911,11 +2911,14 @@ public final class Utilities {
PartitionDetails details = new PartitionDetails();
details.fullSpec = fullPartSpec;
if (partEntry.getValue().isPresent()) {
- details.newFiles = partEntry.getValue().get();
+ details.newFiles = new ArrayList<>();
+ for (Path filePath : partEntry.getValue().get()) {
+ details.newFiles.add(fs.getFileStatus(filePath));
+ }
}
partitionDetailsMap.put(partPath, details);
}
- });
+ }
return partitionDetailsMap;
} catch (IOException e) {
throw new HiveException(e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index f531a35..bb6804b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2364,7 +2364,7 @@ public class Hive {
// If config is set, table is not temporary and partition being inserted exists, capture
// the list of files added. For not yet existing partitions (insert overwrite to new partition
// or dynamic partition inserts), the add partition event will capture the list of files added.
- List<Path> newFiles = null;
+ List<FileStatus> newFiles = null;
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
newFiles = Collections.synchronizedList(new ArrayList<>());
}
@@ -2443,7 +2443,7 @@ public class Hive {
boolean inheritLocation, boolean isSkewedStoreAsSubdir,
boolean isSrcLocal, boolean isAcidIUDoperation, boolean resetStatistics,
Long writeId, int stmtId, boolean isInsertOverwrite,
- boolean isTxnTable, List<Path> newFiles, boolean isDirectInsert) throws HiveException {
+ boolean isTxnTable, List<FileStatus> newFiles, boolean isDirectInsert) throws HiveException {
Path tblDataLocationPath = tbl.getDataLocation();
boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
@@ -2500,18 +2500,11 @@ public class Hive {
}
if (newFiles != null) {
if (!newFiles.isEmpty()) {
- // We already know the file list from the direct insert manifestfile
- FileSystem srcFs = loadPath.getFileSystem(conf);
newFileStatuses = new ArrayList<>();
- for (Path filePath : newFiles) {
- newFileStatuses.add(srcFs.getFileStatus(filePath));
- }
+ newFileStatuses.addAll(newFiles);
} else {
newFileStatuses = listFilesCreatedByQuery(loadPath, writeId, stmtId);
- newFiles.addAll(newFileStatuses
- .stream()
- .map(FileStatus::getPath)
- .collect(Collectors.toList()));
+ newFiles.addAll(newFileStatuses);
}
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
@@ -3022,7 +3015,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
LOG.info("New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec);
Partition oldPartition = partitionDetails.partition;
- List<Path> newFiles = null;
+ List<FileStatus> newFiles = null;
if (partitionDetails.newFiles != null) {
// If we already know the files from the direct insert manifest, use them
newFiles = partitionDetails.newFiles;
@@ -3236,7 +3229,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.perfLogBegin("MoveTask", PerfLogger.LOAD_TABLE);
- List<Path> newFiles = null;
+ List<FileStatus> newFiles = null;
Table tbl = getTable(tableName);
assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
@@ -3245,7 +3238,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
boolean isCompactionTable = AcidUtils.isCompactionTable(tbl.getParameters());
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
- newFiles = Collections.synchronizedList(new ArrayList<Path>());
+ newFiles = Collections.synchronizedList(new ArrayList<FileStatus>());
}
// Note: this assumes both paths are qualified; which they are, currently.
@@ -3262,9 +3255,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
//new files list is required only for event notification.
if (newFiles != null) {
- newFiles.addAll(listFilesCreatedByQuery(loadPath, writeId, stmtId).stream()
- .map(FileStatus::getPath)
- .collect(Collectors.toList()));
+ newFiles.addAll(listFilesCreatedByQuery(loadPath, writeId, stmtId));
}
} else {
// Either a non-MM query, or a load into MM table from an external source.
@@ -3569,7 +3560,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
public void addWriteNotificationLog(Table tbl, Map<String, String> partitionSpec,
- List<Path> newFiles, Long writeId,
+ List<FileStatus> newFiles, Long writeId,
List<WriteNotificationLogRequest> requestList) throws HiveException {
if (!conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
LOG.debug("write notification log is ignored as dml event logging is disabled");
@@ -3606,7 +3597,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
public static void addWriteNotificationLog(HiveConf conf, Table tbl, List<String> partitionVals,
- Long txnId, Long writeId, List<Path> newFiles,
+ Long txnId, Long writeId, List<FileStatus> newFiles,
List<WriteNotificationLogRequest> requestList)
throws IOException, HiveException, TException {
FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
@@ -3625,7 +3616,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
- private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles)
+ private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<FileStatus> newFiles)
throws HiveException {
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
LOG.debug("Firing dml insert event");
@@ -3662,19 +3653,18 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
- private static void addInsertFileInformation(List<Path> newFiles, FileSystem fileSystem,
+ private static void addInsertFileInformation(List<FileStatus> newFiles, FileSystem fileSystem,
InsertEventRequestData insertData) throws IOException {
LinkedList<Path> directories = null;
- for (Path p : newFiles) {
- if (!AcidUtils.bucketFileFilter.accept(p) && !AcidUtils.originalBucketFilter.accept(p)
- && fileSystem.isDirectory(p)) { // Avoid the fs call if it is possible
+ for (FileStatus status : newFiles) {
+ if (status.isDirectory()) {
if (directories == null) {
directories = new LinkedList<>();
}
- directories.add(p);
+ directories.add(status.getPath());
continue;
}
- addInsertNonDirectoryInformation(p, fileSystem, insertData);
+ addInsertNonDirectoryInformation(status.getPath(), fileSystem, insertData);
}
if (directories == null) {
return;
@@ -4929,7 +4919,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
* @throws HiveException
*/
static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs,
- boolean isSrcLocal, boolean isAcidIUD, boolean isOverwrite, List<Path> newFiles, boolean isBucketed,
+ boolean isSrcLocal, boolean isAcidIUD, boolean isOverwrite, List<FileStatus> newFilesStatus, boolean isBucketed,
boolean isFullAcidTable, boolean isManaged, boolean isCompactionTable) throws HiveException {
try {
// create the destination if it does not exist
@@ -4957,6 +4947,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
// srcs = new FileStatus[0]; Why is this needed?
}
+ List<Path> newFiles = null;
+ if (newFilesStatus != null) {
+ newFiles = new ArrayList<>();
+ }
+
// If we're moving files around for an ACID write then the rules and paths are all different.
// You can blame this on Owen.
if (isAcidIUD) {
@@ -4968,6 +4963,17 @@ private void constructOneLBLocationMap(FileStatus fSta,
copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, isOverwrite,
newFiles, isFullAcidTable && !isBucketed, isManaged, isCompactionTable);
}
+
+ if (newFilesStatus != null) {
+ for (Path filePath : newFiles) {
+ try {
+ newFilesStatus.add(fs.getFileStatus(filePath));
+ } catch (Exception e) {
+ LOG.error("Failed to get getFileStatus", e);
+ throw new HiveException(e.getMessage());
+ }
+ }
+ }
}
public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst,
@@ -5130,7 +5136,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
* If the table is managed.
*/
private void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
- boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
+ boolean isSrcLocal, boolean purge, List<FileStatus> newFiles, PathFilter deletePathFilter,
boolean isNeedRecycle, boolean isManaged, boolean isInsertOverwrite) throws HiveException {
try {
@@ -5176,7 +5182,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
// Add file paths of the files that will be moved to the destination if the caller needs it
if (newFiles != null) {
- newFiles.addAll(HdfsUtils.listPath(destFs, destf, null, true));
+ newFiles.addAll(HdfsUtils.listLocatedFileStatus(destFs, destf, null, true));
}
} else {
final Map<Future<Boolean>, Path> moveFutures = Maps.newLinkedHashMapWithExpectedSize(srcs.length);
@@ -5224,7 +5230,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
// Add file paths of the files that will be moved to the destination if the caller needs it
if (null != newFiles) {
- newFiles.add(moveFuture.getValue());
+ newFiles.add(destFs.getFileStatus(moveFuture.getValue()));
}
}
}
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 721ce73..a61beb5 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
@@ -781,7 +782,7 @@ public class HiveStreamingConnection implements StreamingConnection {
// List the new files added inside the write path (delta directory).
FileSystem fs = tableObject.getDataLocation().getFileSystem(conf);
- List<Path> newFiles = HdfsUtils.listPath(fs, writeInfo.getWriteDir(), null, true);
+ List<FileStatus> newFiles = HdfsUtils.listLocatedFileStatus(fs, writeInfo.getWriteDir(), null, true);
// If no files are added by this streaming writes, then no need to log write notification event.
if (newFiles.isEmpty()) {