You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/03 05:55:49 UTC

[hudi] branch master updated: [HUDI-2269] Release the disk map resource for flink streaming reader (#3384)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bec23bd  [HUDI-2269] Release the disk map resource for flink streaming reader (#3384)
bec23bd is described below

commit bec23bda50b5252013af54dcf39c3a645eea6c7e
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Tue Aug 3 13:55:35 2021 +0800

    [HUDI-2269] Release the disk map resource for flink streaming reader (#3384)
---
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  1 -
 .../util/collection/ExternalSpillableMap.java      |  2 +
 .../hudi/sink/bootstrap/BootstrapFunction.java     | 27 ++---------
 .../org/apache/hudi/table/format/FormatUtils.java  | 24 ++++++++++
 .../table/format/mor/MergeOnReadInputFormat.java   | 56 ++++++++++++++--------
 5 files changed, 67 insertions(+), 43 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 778e825..347f8cc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -357,7 +357,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
         }
       }
 
-      keyToNewRecords.clear();
       ((ExternalSpillableMap) keyToNewRecords).close();
       writtenRecordKeys.clear();
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 8044f84..d31b0aa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -256,7 +256,9 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
   }
 
   public void close() {
+    inMemoryMap.clear();
     getDiskBasedMap().close();
+    currentInMemoryMapSize = 0L;
   }
 
   @Override
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
index d24452b..0a4ef30 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
@@ -201,7 +202,8 @@ public class BootstrapFunction<I, O extends HoodieRecord>
                 .filter(logFile -> logFile.getFileSize() > 0)
                 .map(logFile -> logFile.getPath().toString())
                 .collect(toList());
-        HoodieMergedLogRecordScanner scanner = scanLog(logPaths, schema, latestCommitTime.get().getTimestamp());
+        HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(),
+            writeConfig, hadoopConf);
 
         try {
           for (String recordKey : scanner.getRecords().keySet()) {
@@ -209,6 +211,8 @@ public class BootstrapFunction<I, O extends HoodieRecord>
           }
         } catch (Exception e) {
           throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
+        } finally {
+          scanner.close();
         }
       }
     }
@@ -218,27 +222,6 @@ public class BootstrapFunction<I, O extends HoodieRecord>
         this.getClass().getSimpleName(), taskID, partitionPath, cost);
   }
 
-  private HoodieMergedLogRecordScanner scanLog(
-          List<String> logPaths,
-          Schema logSchema,
-          String latestInstantTime) {
-    String basePath = this.hoodieTable.getMetaClient().getBasePath();
-    return HoodieMergedLogRecordScanner.newBuilder()
-        .withFileSystem(FSUtils.getFs(basePath, this.hadoopConf))
-        .withBasePath(basePath)
-        .withLogFilePaths(logPaths)
-        .withReaderSchema(logSchema)
-        .withLatestInstantTime(latestInstantTime)
-        .withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled())
-        .withReverseReader(false)
-        .withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize())
-        .withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge())
-        .withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath())
-        .withDiskMapType(this.writeConfig.getCommonConfig().getSpillableDiskMapType())
-        .withBitCaskDiskMapCompressionEnabled(this.writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
-        .build();
-  }
-
   @SuppressWarnings("unchecked")
   public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) {
     HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index c075b09..cbf1ea7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format;
 
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -83,6 +84,29 @@ public class FormatUtils {
         .build();
   }
 
+  public static HoodieMergedLogRecordScanner scanLog(
+      List<String> logPaths,
+      Schema logSchema,
+      String latestInstantTime,
+      HoodieWriteConfig writeConfig,
+      Configuration hadoopConf) {
+    String basePath = writeConfig.getBasePath();
+    return HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(FSUtils.getFs(basePath, hadoopConf))
+        .withBasePath(basePath)
+        .withLogFilePaths(logPaths)
+        .withReaderSchema(logSchema)
+        .withLatestInstantTime(latestInstantTime)
+        .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
+        .withReverseReader(false)
+        .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
+        .withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge())
+        .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
+        .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
+        .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
+        .build();
+  }
+
   private static Boolean string2Boolean(String s) {
     return "true".equals(s.toLowerCase(Locale.ROOT));
   }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index aa7453c..4d68242 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.table.format.mor;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -59,7 +59,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.IntStream;
 
@@ -293,15 +292,14 @@ public class MergeOnReadInputFormat
         Long.MAX_VALUE); // read the whole file
   }
 
-  private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
+  private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
     final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
     final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
     final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
     final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
         AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
-    final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
-        FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
-    final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
+    final HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
+    final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
     final int[] pkOffset = tableState.getPkOffsetsInRequired();
     // flag saying whether the pk semantics has been dropped by user specified
     // projections. For e.g, if the pk fields are [a, b] but user only select a,
@@ -310,7 +308,7 @@ public class MergeOnReadInputFormat
     final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
     final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
 
-    return new Iterator<RowData>() {
+    return new ClosableIterator<RowData>() {
       private RowData currentRecord;
 
       @Override
@@ -318,7 +316,7 @@ public class MergeOnReadInputFormat
         while (logRecordsKeyIterator.hasNext()) {
           String curAvroKey = logRecordsKeyIterator.next();
           Option<IndexedRecord> curAvroRecord = null;
-          final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
+          final HoodieRecord<?> hoodieRecord = scanner.getRecords().get(curAvroKey);
           try {
             curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
           } catch (IOException e) {
@@ -359,6 +357,11 @@ public class MergeOnReadInputFormat
       public RowData next() {
         return currentRecord;
       }
+
+      @Override
+      public void close() {
+        scanner.close();
+      }
     };
   }
 
@@ -366,6 +369,11 @@ public class MergeOnReadInputFormat
   //  Inner Class
   // -------------------------------------------------------------------------
 
+  private interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
+    @Override
+    void close(); // override to not throw exception
+  }
+
   private interface RecordIterator {
     boolean reachedEnd() throws IOException;
 
@@ -453,9 +461,9 @@ public class MergeOnReadInputFormat
 
   static class LogFileOnlyIterator implements RecordIterator {
     // iterator for log files
-    private final Iterator<RowData> iterator;
+    private final ClosableIterator<RowData> iterator;
 
-    LogFileOnlyIterator(Iterator<RowData> iterator) {
+    LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
       this.iterator = iterator;
     }
 
@@ -471,7 +479,9 @@ public class MergeOnReadInputFormat
 
     @Override
     public void close() {
-      // no operation
+      if (this.iterator != null) {
+        this.iterator.close();
+      }
     }
   }
 
@@ -479,7 +489,7 @@ public class MergeOnReadInputFormat
     // base file reader
     private final ParquetColumnarRowSplitReader reader;
     // iterator for log files
-    private final Iterator<RowData> iterator;
+    private final ClosableIterator<RowData> iterator;
 
     // add the flag because the flink ParquetColumnarRowSplitReader is buggy:
     // method #reachedEnd() returns false after it returns true.
@@ -488,7 +498,7 @@ public class MergeOnReadInputFormat
 
     private RowData currentRecord;
 
-    SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator<RowData> iterator) {
+    SkipMergeIterator(ParquetColumnarRowSplitReader reader, ClosableIterator<RowData> iterator) {
       this.reader = reader;
       this.iterator = iterator;
     }
@@ -517,6 +527,9 @@ public class MergeOnReadInputFormat
       if (this.reader != null) {
         this.reader.close();
       }
+      if (this.iterator != null) {
+        this.iterator.close();
+      }
     }
   }
 
@@ -525,8 +538,8 @@ public class MergeOnReadInputFormat
     private final ParquetColumnarRowSplitReader reader;
     // log keys used for merging
     private final Iterator<String> logKeysIterator;
-    // log records
-    private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords;
+    // scanner
+    private final HoodieMergedLogRecordScanner scanner;
 
     private final Schema tableSchema;
     private final Schema requiredSchema;
@@ -559,8 +572,8 @@ public class MergeOnReadInputFormat
         ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
       this.tableSchema = tableSchema;
       this.reader = reader;
-      this.logRecords = FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
-      this.logKeysIterator = this.logRecords.keySet().iterator();
+      this.scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
+      this.logKeysIterator = scanner.getRecords().keySet().iterator();
       this.requiredSchema = requiredSchema;
       this.requiredPos = requiredPos;
       this.recordBuilder = new GenericRecordBuilder(requiredSchema);
@@ -582,7 +595,7 @@ public class MergeOnReadInputFormat
           }
         }
         final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
-        if (logRecords.containsKey(curKey)) {
+        if (scanner.getRecords().containsKey(curKey)) {
           keyToSkip.add(curKey);
           Option<IndexedRecord> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
           if (!mergedAvroRecord.isPresent()) {
@@ -608,7 +621,7 @@ public class MergeOnReadInputFormat
         final String curKey = logKeysIterator.next();
         if (!keyToSkip.contains(curKey)) {
           Option<IndexedRecord> insertAvroRecord =
-              logRecords.get(curKey).getData().getInsertValue(tableSchema);
+              scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema);
           if (insertAvroRecord.isPresent()) {
             // the record is a DELETE if insertAvroRecord not present, skipping
             GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
@@ -634,13 +647,16 @@ public class MergeOnReadInputFormat
       if (this.reader != null) {
         this.reader.close();
       }
+      if (this.scanner != null) {
+        this.scanner.close();
+      }
     }
 
     private Option<IndexedRecord> mergeRowWithLog(
         RowData curRow,
         String curKey) throws IOException {
       GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
-      return logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
+      return scanner.getRecords().get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
     }
   }