You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:29 UTC
[hudi] 19/45: add log to print scanInternal's logFilePath
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ecd39e3ad76b92a4f1dd0e18beed146736dc0592
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Wed Nov 2 12:26:49 2022 +0800
add log to print scanInternal's logFilePath
---
.../table/log/AbstractHoodieLogRecordReader.java | 41 ++++++++++++----------
1 file changed, 22 insertions(+), 19 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 4566b1f5cd..eaca33ddcf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -72,7 +72,7 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo
/**
* Implements logic to scan log blocks and expose valid and deleted log records to subclass implementation. Subclass is
* free to either apply merging or expose raw data back to the caller.
- *
+ * <p>
* NOTE: If readBlockLazily is turned on, does not merge, instead keeps reading log blocks and merges everything at once
* This is an optimization to avoid seek() back and forth to read new block (forward seek()) and lazily read content of
* seen block (reverse and forward seek()) during merge | | Read Block 1 Metadata | | Read Block 1 Data | | | Read Block
@@ -208,6 +208,8 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline();
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
+ HoodieLogFile logFile;
+ Path logFilePath = null;
try {
// Get the key field based on populate meta fields config
// and the table type
@@ -216,12 +218,13 @@ public abstract class AbstractHoodieLogRecordReader {
// Iterate over the paths
boolean enableRecordLookups = !forceFullScan;
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
- logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
+ logFilePaths.stream().map(log -> new HoodieLogFile(new Path(log))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
- HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+ logFile = logFormatReaderWrapper.getLogFile();
+ logFilePath = logFile.getPath();
LOG.info("Scanning log file " + logFile);
scannedLogFiles.add(logFile);
totalLogFiles.set(scannedLogFiles.size());
@@ -250,7 +253,7 @@ public abstract class AbstractHoodieLogRecordReader {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
- LOG.info("Reading a data block from file " + logFile.getPath() + " at instant "
+ LOG.info("Reading a data block from file " + logFilePath + " at instant "
+ logBlock.getLogBlockHeader().get(INSTANT_TIME));
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is an avro data block belonging to a different commit/instant,
@@ -261,7 +264,7 @@ public abstract class AbstractHoodieLogRecordReader {
currentInstantLogBlocks.push(logBlock);
break;
case DELETE_BLOCK:
- LOG.info("Reading a delete block from file " + logFile.getPath());
+ LOG.info("Reading a delete block from file " + logFilePath);
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is a delete data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
@@ -283,7 +286,7 @@ public abstract class AbstractHoodieLogRecordReader {
// written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
// The following code ensures the same rollback block (R1) is used to rollback
// both B1 & B2
- LOG.info("Reading a command block from file " + logFile.getPath());
+ LOG.info("Reading a command block from file " + logFilePath);
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
String targetInstantForCommandBlock =
@@ -302,23 +305,23 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
// handle corrupt blocks separately since they may not have metadata
if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
- LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath());
+ LOG.info("Rolling back the last corrupted log block read in " + logFilePath);
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
} else if (targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
// rollback last data block or delete block
- LOG.info("Rolling back the last log block read in " + logFile.getPath());
+ LOG.info("Rolling back the last log block read in " + logFilePath);
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
} else if (!targetInstantForCommandBlock
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
// invalid or extra rollback block
LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
- + " invalid or extra rollback command block in " + logFile.getPath());
+ + " invalid or extra rollback command block in " + logFilePath);
break;
} else {
// this should not happen ideally
- LOG.warn("Unable to apply rollback command block in " + logFile.getPath());
+ LOG.warn("Unable to apply rollback command block in " + logFilePath);
}
}
LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);
@@ -328,7 +331,7 @@ public abstract class AbstractHoodieLogRecordReader {
}
break;
case CORRUPT_BLOCK:
- LOG.info("Found a corrupt block in " + logFile.getPath());
+ LOG.info("Found a corrupt block in " + logFilePath);
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the next data block
currentInstantLogBlocks.push(logBlock);
@@ -345,11 +348,11 @@ public abstract class AbstractHoodieLogRecordReader {
// Done
progress = 1.0f;
} catch (IOException e) {
- LOG.error("Got IOException when reading log file", e);
- throw new HoodieIOException("IOException when reading log file ", e);
+ LOG.error("Got IOException when reading log file: " + logFilePath, e);
+ throw new HoodieIOException("IOException when reading log file: " + logFilePath, e);
} catch (Exception e) {
- LOG.error("Got exception when reading log file", e);
- throw new HoodieException("Exception when reading log file ", e);
+ LOG.error("Got exception when reading log file: " + logFilePath, e);
+ throw new HoodieException("Exception when reading log file: " + logFilePath, e);
} finally {
try {
if (null != logFormatReaderWrapper) {
@@ -423,10 +426,10 @@ public abstract class AbstractHoodieLogRecordReader {
* @return HoodieRecord created from the IndexedRecord
*/
protected HoodieAvroRecord<?> createHoodieRecord(final IndexedRecord rec, final HoodieTableConfig hoodieTableConfig,
- final String payloadClassFQN, final String preCombineField,
- final boolean withOperationField,
- final Option<Pair<String, String>> simpleKeyGenFields,
- final Option<String> partitionName) {
+ final String payloadClassFQN, final String preCombineField,
+ final boolean withOperationField,
+ final Option<Pair<String, String>> simpleKeyGenFields,
+ final Option<String> partitionName) {
if (this.populateMetaFields) {
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, payloadClassFQN,
preCombineField, withOperationField);