You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/03 05:55:49 UTC
[hudi] branch master updated: [HUDI-2269] Release the disk map
resource for flink streaming reader (#3384)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bec23bd [HUDI-2269] Release the disk map resource for flink streaming reader (#3384)
bec23bd is described below
commit bec23bda50b5252013af54dcf39c3a645eea6c7e
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Tue Aug 3 13:55:35 2021 +0800
[HUDI-2269] Release the disk map resource for flink streaming reader (#3384)
---
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 1 -
.../util/collection/ExternalSpillableMap.java | 2 +
.../hudi/sink/bootstrap/BootstrapFunction.java | 27 ++---------
.../org/apache/hudi/table/format/FormatUtils.java | 24 ++++++++++
.../table/format/mor/MergeOnReadInputFormat.java | 56 ++++++++++++++--------
5 files changed, 67 insertions(+), 43 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 778e825..347f8cc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -357,7 +357,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
}
}
- keyToNewRecords.clear();
((ExternalSpillableMap) keyToNewRecords).close();
writtenRecordKeys.clear();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 8044f84..d31b0aa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -256,7 +256,9 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
}
public void close() {
+ inMemoryMap.clear();
getDiskBasedMap().close();
+ currentInMemoryMapSize = 0L;
}
@Override
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
index d24452b..0a4ef30 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
@@ -201,7 +202,8 @@ public class BootstrapFunction<I, O extends HoodieRecord>
.filter(logFile -> logFile.getFileSize() > 0)
.map(logFile -> logFile.getPath().toString())
.collect(toList());
- HoodieMergedLogRecordScanner scanner = scanLog(logPaths, schema, latestCommitTime.get().getTimestamp());
+ HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(),
+ writeConfig, hadoopConf);
try {
for (String recordKey : scanner.getRecords().keySet()) {
@@ -209,6 +211,8 @@ public class BootstrapFunction<I, O extends HoodieRecord>
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
+ } finally {
+ scanner.close();
}
}
}
@@ -218,27 +222,6 @@ public class BootstrapFunction<I, O extends HoodieRecord>
this.getClass().getSimpleName(), taskID, partitionPath, cost);
}
- private HoodieMergedLogRecordScanner scanLog(
- List<String> logPaths,
- Schema logSchema,
- String latestInstantTime) {
- String basePath = this.hoodieTable.getMetaClient().getBasePath();
- return HoodieMergedLogRecordScanner.newBuilder()
- .withFileSystem(FSUtils.getFs(basePath, this.hadoopConf))
- .withBasePath(basePath)
- .withLogFilePaths(logPaths)
- .withReaderSchema(logSchema)
- .withLatestInstantTime(latestInstantTime)
- .withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled())
- .withReverseReader(false)
- .withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize())
- .withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge())
- .withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath())
- .withDiskMapType(this.writeConfig.getCommonConfig().getSpillableDiskMapType())
- .withBitCaskDiskMapCompressionEnabled(this.writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
- .build();
- }
-
@SuppressWarnings("unchecked")
public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) {
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index c075b09..cbf1ea7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -83,6 +84,29 @@ public class FormatUtils {
.build();
}
+ public static HoodieMergedLogRecordScanner scanLog(
+ List<String> logPaths,
+ Schema logSchema,
+ String latestInstantTime,
+ HoodieWriteConfig writeConfig,
+ Configuration hadoopConf) {
+ String basePath = writeConfig.getBasePath();
+ return HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(FSUtils.getFs(basePath, hadoopConf))
+ .withBasePath(basePath)
+ .withLogFilePaths(logPaths)
+ .withReaderSchema(logSchema)
+ .withLatestInstantTime(latestInstantTime)
+ .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
+ .withReverseReader(false)
+ .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
+ .withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge())
+ .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
+ .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
+ .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
+ .build();
+ }
+
private static Boolean string2Boolean(String s) {
return "true".equals(s.toLowerCase(Locale.ROOT));
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index aa7453c..4d68242 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -19,7 +19,7 @@
package org.apache.hudi.table.format.mor;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
@@ -59,7 +59,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
@@ -293,15 +292,14 @@ public class MergeOnReadInputFormat
Long.MAX_VALUE); // read the whole file
}
- private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
+ private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
- final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
- FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
- final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
+ final HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
+ final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
final int[] pkOffset = tableState.getPkOffsetsInRequired();
// flag saying whether the pk semantics has been dropped by user specified
// projections. For e.g, if the pk fields are [a, b] but user only select a,
@@ -310,7 +308,7 @@ public class MergeOnReadInputFormat
final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
- return new Iterator<RowData>() {
+ return new ClosableIterator<RowData>() {
private RowData currentRecord;
@Override
@@ -318,7 +316,7 @@ public class MergeOnReadInputFormat
while (logRecordsKeyIterator.hasNext()) {
String curAvroKey = logRecordsKeyIterator.next();
Option<IndexedRecord> curAvroRecord = null;
- final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
+ final HoodieRecord<?> hoodieRecord = scanner.getRecords().get(curAvroKey);
try {
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
} catch (IOException e) {
@@ -359,6 +357,11 @@ public class MergeOnReadInputFormat
public RowData next() {
return currentRecord;
}
+
+ @Override
+ public void close() {
+ scanner.close();
+ }
};
}
@@ -366,6 +369,11 @@ public class MergeOnReadInputFormat
// Inner Class
// -------------------------------------------------------------------------
+ private interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
+ @Override
+ void close(); // override to not throw exception
+ }
+
private interface RecordIterator {
boolean reachedEnd() throws IOException;
@@ -453,9 +461,9 @@ public class MergeOnReadInputFormat
static class LogFileOnlyIterator implements RecordIterator {
// iterator for log files
- private final Iterator<RowData> iterator;
+ private final ClosableIterator<RowData> iterator;
- LogFileOnlyIterator(Iterator<RowData> iterator) {
+ LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
this.iterator = iterator;
}
@@ -471,7 +479,9 @@ public class MergeOnReadInputFormat
@Override
public void close() {
- // no operation
+ if (this.iterator != null) {
+ this.iterator.close();
+ }
}
}
@@ -479,7 +489,7 @@ public class MergeOnReadInputFormat
// base file reader
private final ParquetColumnarRowSplitReader reader;
// iterator for log files
- private final Iterator<RowData> iterator;
+ private final ClosableIterator<RowData> iterator;
// add the flag because the flink ParquetColumnarRowSplitReader is buggy:
// method #reachedEnd() returns false after it returns true.
@@ -488,7 +498,7 @@ public class MergeOnReadInputFormat
private RowData currentRecord;
- SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator<RowData> iterator) {
+ SkipMergeIterator(ParquetColumnarRowSplitReader reader, ClosableIterator<RowData> iterator) {
this.reader = reader;
this.iterator = iterator;
}
@@ -517,6 +527,9 @@ public class MergeOnReadInputFormat
if (this.reader != null) {
this.reader.close();
}
+ if (this.iterator != null) {
+ this.iterator.close();
+ }
}
}
@@ -525,8 +538,8 @@ public class MergeOnReadInputFormat
private final ParquetColumnarRowSplitReader reader;
// log keys used for merging
private final Iterator<String> logKeysIterator;
- // log records
- private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords;
+ // scanner
+ private final HoodieMergedLogRecordScanner scanner;
private final Schema tableSchema;
private final Schema requiredSchema;
@@ -559,8 +572,8 @@ public class MergeOnReadInputFormat
ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
this.tableSchema = tableSchema;
this.reader = reader;
- this.logRecords = FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
- this.logKeysIterator = this.logRecords.keySet().iterator();
+ this.scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
+ this.logKeysIterator = scanner.getRecords().keySet().iterator();
this.requiredSchema = requiredSchema;
this.requiredPos = requiredPos;
this.recordBuilder = new GenericRecordBuilder(requiredSchema);
@@ -582,7 +595,7 @@ public class MergeOnReadInputFormat
}
}
final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
- if (logRecords.containsKey(curKey)) {
+ if (scanner.getRecords().containsKey(curKey)) {
keyToSkip.add(curKey);
Option<IndexedRecord> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
if (!mergedAvroRecord.isPresent()) {
@@ -608,7 +621,7 @@ public class MergeOnReadInputFormat
final String curKey = logKeysIterator.next();
if (!keyToSkip.contains(curKey)) {
Option<IndexedRecord> insertAvroRecord =
- logRecords.get(curKey).getData().getInsertValue(tableSchema);
+ scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema);
if (insertAvroRecord.isPresent()) {
// the record is a DELETE if insertAvroRecord not present, skipping
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
@@ -634,13 +647,16 @@ public class MergeOnReadInputFormat
if (this.reader != null) {
this.reader.close();
}
+ if (this.scanner != null) {
+ this.scanner.close();
+ }
}
private Option<IndexedRecord> mergeRowWithLog(
RowData curRow,
String curKey) throws IOException {
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
- return logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
+ return scanner.getRecords().get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
}
}