You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/03/14 01:09:25 UTC
[hudi] branch master updated: [MINOR] Use ExecutorFactory in BootstrapHandler (#7808)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 07c6d8b1845 [MINOR] Use ExecutorFactory in BootstrapHandler (#7808)
07c6d8b1845 is described below
commit 07c6d8b18459f4793835ca65e77f657bdfadd6c6
Author: KnightChess <98...@qq.com>
AuthorDate: Tue Mar 14 09:09:17 2023 +0800
[MINOR] Use ExecutorFactory in BootstrapHandler (#7808)
---
.../bootstrap/OrcBootstrapMetadataHandler.java | 23 +++++++++++-----------
.../bootstrap/ParquetBootstrapMetadataHandler.java | 11 +++++++----
2 files changed, 19 insertions(+), 15 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index 14a442c93b1..aaeff426c7a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -26,12 +26,13 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.OrcReaderIterator;
-import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.ExecutorFactory;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -69,19 +70,19 @@ class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
if (config.getRecordMerger().getRecordType() == HoodieRecordType.SPARK) {
throw new UnsupportedOperationException();
}
- BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null;
+ HoodieExecutor<Void> wrapper = null;
Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf()));
TypeDescription orcSchema = orcReader.getSchema();
try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) {
- wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
- new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
- String recKey = keyGenerator.getKey(inp).getRecordKey();
- GenericRecord gr = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
- gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
- BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
- HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
- return rec;
- }, table.getPreExecuteRunnable());
+ wrapper = ExecutorFactory.create(config, new OrcReaderIterator<GenericRecord>(reader, avroSchema, orcSchema),
+ new BootstrapRecordConsumer(bootstrapHandle), inp -> {
+ String recKey = keyGenerator.getKey(inp).getRecordKey();
+ GenericRecord gr = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
+ gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
+ BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
+ HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
+ return rec;
+ }, table.getPreExecuteRunnable());
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index e5944225750..02cef7761c1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -27,8 +27,9 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieBootstrapHandle;
@@ -36,6 +37,7 @@ import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.ExecutorFactory;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
@@ -76,7 +78,7 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
KeyGeneratorInterface keyGenerator,
String partitionPath,
Schema schema) throws Exception {
- BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
+ HoodieExecutor<Void> wrapper = null;
HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType())
@@ -91,8 +93,9 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
.copy();
};
- wrapper = new BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
- reader.getRecordIterator(schema), new BootstrapRecordConsumer(bootstrapHandle), transformer, table.getPreExecuteRunnable());
+ ClosableIterator<HoodieRecord> recordIterator = reader.getRecordIterator(schema);
+ wrapper = ExecutorFactory.create(config, recordIterator,
+ new BootstrapRecordConsumer(bootstrapHandle), transformer, table.getPreExecuteRunnable());
wrapper.execute();
} catch (Exception e) {