You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/09 00:23:43 UTC

[GitHub] [hudi] suryaprasanna commented on a diff in pull request #5958: [HUDI-3900] [UBER] Support log compaction action for MOR tables

suryaprasanna commented on code in PR #5958:
URL: https://github.com/apache/hudi/pull/5958#discussion_r985294717


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not the previous block.
+       * This becomes more complicated if we have compacted blocks, which are data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks that are not corrupted
+       * along with the block instant times and rollback block's target instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse order.
+       * While iterating in reverse order keep a track of final compacted instant times for each block.
+       * In doing so, when a data block is seen include the final compacted block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {
+        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        LOG.info("Scanning log file " + logFile);
+        scannedLogFiles.add(logFile);
+        totalLogFiles.set(scannedLogFiles.size());
+        // Use the HoodieLogFileReader to iterate through the blocks in the log file
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        totalLogBlocks.incrementAndGet();
+        // Ignore the corrupt blocks. No further handling is required for them.
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+        if (!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
+            HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
+          // hit a block with instant time greater than should be processed, stop processing further
+          break;
+        }
+        if (logBlock.getBlockType() != COMMAND_BLOCK) {
+          if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
+              || inflightInstantsTimeline.containsInstant(instantTime)) {
+            // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
+            continue;
+          }
+          if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) {
+            // filter the log block by instant range
+            continue;
+          }
+        }
+
+        switch (logBlock.getBlockType()) {
+          case HFILE_DATA_BLOCK:
+          case AVRO_DATA_BLOCK:
+          case DELETE_BLOCK:
+            List<HoodieLogBlock> logBlocksList = instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
+            if (logBlocksList.size() == 0) {
+              // Keep a track of instant Times in the order of arrival.
+              orderedInstantsList.add(instantTime);
+            }
+            logBlocksList.add(logBlock);
+            instantToBlocksMap.put(instantTime, logBlocksList);
+            break;
+          case COMMAND_BLOCK:
+            LOG.info("Reading a command block from file " + logFile.getPath());
+            // This is a command block - take appropriate action based on the command
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
+
+            // Rollback blocks contain information of instants that are failed, collect them in a set..
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet supported.");
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Block type not yet supported.");
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+      }
+
+      int numBlocksRolledBack = 0;
+
+      // All the block's instants time that are added to the queue are collected in this set.
+      Set<String> instantTimesIncluded = new HashSet<>();
+
+      // Key will have details related to instant time and value will be empty if that instant is not compacted.
+      // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 -> i3.
+      Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
+
+      /*
+       * 2. Iterate the instants list in reverse order to get the latest instants first.
+       *    While iterating update the blockTimeToCompactionBlockTimesMap and include the compacted blocks in right position.
+       */
+      for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
+        String instantTime = orderedInstantsList.get(i);
+
+        // Exclude the blocks which are included in targetRollbackInstants set.
+        // Here, rollback can include instants affiliated to deltacommits or log compaction commits.
+        if (targetRollbackInstants.contains(instantTime)) {
+          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
+          continue;
+        }
+        List<HoodieLogBlock> instantsBlocks = instantToBlocksMap.get(instantTime);
+        if (instantsBlocks.size() == 0) {
+          throw new HoodieException("Data corrupted while writing. Found zero blocks for an instant " + instantTime);
+        }
+        HoodieLogBlock firstBlock = instantsBlocks.get(0);
+
+        // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under its headers.
+        if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) {
+          // When compacted blocks are seen update the blockTimeToCompactionBlockTimeMap.
+          Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
+              .forEach(originalInstant -> {
+                String finalInstant = blockTimeToCompactionBlockTimeMap.get(instantTime);
+                blockTimeToCompactionBlockTimeMap.put(originalInstant, finalInstant);
+              });
+        } else {
+          // When a data block is found check if it is already compacted.
+          String compactedFinalInstantTime = blockTimeToCompactionBlockTimeMap.get(instantTime);
+          if (compactedFinalInstantTime == null) {
+            // If it is not compacted then add the blocks related to the instant time at the end of the queue and continue.
+            instantToBlocksMap.get(instantTime).forEach(block -> currentInstantLogBlocks.addLast(block));

Review Comment:
   Addressed this comment now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org