You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:29 UTC

[hudi] 19/45: add log to print scanInternal's logFilePath

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ecd39e3ad76b92a4f1dd0e18beed146736dc0592
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Wed Nov 2 12:26:49 2022 +0800

    add log to print scanInternal's logFilePath
---
 .../table/log/AbstractHoodieLogRecordReader.java   | 41 ++++++++++++----------
 1 file changed, 22 insertions(+), 19 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 4566b1f5cd..eaca33ddcf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -72,7 +72,7 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo
 /**
  * Implements logic to scan log blocks and expose valid and deleted log records to subclass implementation. Subclass is
  * free to either apply merging or expose raw data back to the caller.
- *
+ * <p>
  * NOTE: If readBlockLazily is turned on, does not merge, instead keeps reading log blocks and merges everything at once
  * This is an optimization to avoid seek() back and forth to read new block (forward seek()) and lazily read content of
  * seen block (reverse and forward seek()) during merge | | Read Block 1 Metadata | | Read Block 1 Data | | | Read Block
@@ -208,6 +208,8 @@ public abstract class AbstractHoodieLogRecordReader {
     HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline();
     HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
     HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
+    HoodieLogFile logFile;
+    Path logFilePath = null;
     try {
       // Get the key field based on populate meta fields config
       // and the table type
@@ -216,12 +218,13 @@ public abstract class AbstractHoodieLogRecordReader {
       // Iterate over the paths
       boolean enableRecordLookups = !forceFullScan;
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
-          logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
+          logFilePaths.stream().map(log -> new HoodieLogFile(new Path(log))).collect(Collectors.toList()),
           readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
 
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
       while (logFormatReaderWrapper.hasNext()) {
-        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        logFile = logFormatReaderWrapper.getLogFile();
+        logFilePath = logFile.getPath();
         LOG.info("Scanning log file " + logFile);
         scannedLogFiles.add(logFile);
         totalLogFiles.set(scannedLogFiles.size());
@@ -250,7 +253,7 @@ public abstract class AbstractHoodieLogRecordReader {
           case HFILE_DATA_BLOCK:
           case AVRO_DATA_BLOCK:
           case PARQUET_DATA_BLOCK:
-            LOG.info("Reading a data block from file " + logFile.getPath() + " at instant "
+            LOG.info("Reading a data block from file " + logFilePath + " at instant "
                 + logBlock.getLogBlockHeader().get(INSTANT_TIME));
             if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
               // If this is an avro data block belonging to a different commit/instant,
@@ -261,7 +264,7 @@ public abstract class AbstractHoodieLogRecordReader {
             currentInstantLogBlocks.push(logBlock);
             break;
           case DELETE_BLOCK:
-            LOG.info("Reading a delete block from file " + logFile.getPath());
+            LOG.info("Reading a delete block from file " + logFilePath);
             if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
               // If this is a delete data block belonging to a different commit/instant,
               // then merge the last blocks and records into the main result
@@ -283,7 +286,7 @@ public abstract class AbstractHoodieLogRecordReader {
             // written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
             // The following code ensures the same rollback block (R1) is used to rollback
             // both B1 & B2
-            LOG.info("Reading a command block from file " + logFile.getPath());
+            LOG.info("Reading a command block from file " + logFilePath);
             // This is a command block - take appropriate action based on the command
             HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
             String targetInstantForCommandBlock =
@@ -302,23 +305,23 @@ public abstract class AbstractHoodieLogRecordReader {
                   HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
                   // handle corrupt blocks separately since they may not have metadata
                   if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath());
+                    LOG.info("Rolling back the last corrupted log block read in " + logFilePath);
                     currentInstantLogBlocks.pop();
                     numBlocksRolledBack++;
                   } else if (targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
                     // rollback last data block or delete block
-                    LOG.info("Rolling back the last log block read in " + logFile.getPath());
+                    LOG.info("Rolling back the last log block read in " + logFilePath);
                     currentInstantLogBlocks.pop();
                     numBlocksRolledBack++;
                   } else if (!targetInstantForCommandBlock
                       .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
                     // invalid or extra rollback block
                     LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
-                        + " invalid or extra rollback command block in " + logFile.getPath());
+                        + " invalid or extra rollback command block in " + logFilePath);
                     break;
                   } else {
                     // this should not happen ideally
-                    LOG.warn("Unable to apply rollback command block in " + logFile.getPath());
+                    LOG.warn("Unable to apply rollback command block in " + logFilePath);
                   }
                 }
                 LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);
@@ -328,7 +331,7 @@ public abstract class AbstractHoodieLogRecordReader {
             }
             break;
           case CORRUPT_BLOCK:
-            LOG.info("Found a corrupt block in " + logFile.getPath());
+            LOG.info("Found a corrupt block in " + logFilePath);
             totalCorruptBlocks.incrementAndGet();
             // If there is a corrupt block - we will assume that this was the next data block
             currentInstantLogBlocks.push(logBlock);
@@ -345,11 +348,11 @@ public abstract class AbstractHoodieLogRecordReader {
       // Done
       progress = 1.0f;
     } catch (IOException e) {
-      LOG.error("Got IOException when reading log file", e);
-      throw new HoodieIOException("IOException when reading log file ", e);
+      LOG.error("Got IOException when reading log file: " + logFilePath, e);
+      throw new HoodieIOException("IOException when reading log file: " + logFilePath, e);
     } catch (Exception e) {
-      LOG.error("Got exception when reading log file", e);
-      throw new HoodieException("Exception when reading log file ", e);
+      LOG.error("Got exception when reading log file: " + logFilePath, e);
+      throw new HoodieException("Exception when reading log file: " + logFilePath, e);
     } finally {
       try {
         if (null != logFormatReaderWrapper) {
@@ -423,10 +426,10 @@ public abstract class AbstractHoodieLogRecordReader {
    * @return HoodieRecord created from the IndexedRecord
    */
   protected HoodieAvroRecord<?> createHoodieRecord(final IndexedRecord rec, final HoodieTableConfig hoodieTableConfig,
-                                               final String payloadClassFQN, final String preCombineField,
-                                               final boolean withOperationField,
-                                               final Option<Pair<String, String>> simpleKeyGenFields,
-                                               final Option<String> partitionName) {
+                                                   final String payloadClassFQN, final String preCombineField,
+                                                   final boolean withOperationField,
+                                                   final Option<Pair<String, String>> simpleKeyGenFields,
+                                                   final Option<String> partitionName) {
     if (this.populateMetaFields) {
       return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, payloadClassFQN,
           preCombineField, withOperationField);