You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/03/14 01:09:25 UTC

[hudi] branch master updated: [MINOR] Use ExecutorFactory in BootstrapHandler (#7808)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 07c6d8b1845 [MINOR] Use ExecutorFactory in BootstrapHandler (#7808)
07c6d8b1845 is described below

commit 07c6d8b18459f4793835ca65e77f657bdfadd6c6
Author: KnightChess <98...@qq.com>
AuthorDate: Tue Mar 14 09:09:17 2023 +0800

    [MINOR] Use ExecutorFactory in BootstrapHandler (#7808)
---
 .../bootstrap/OrcBootstrapMetadataHandler.java     | 23 +++++++++++-----------
 .../bootstrap/ParquetBootstrapMetadataHandler.java | 11 +++++++----
 2 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index 14a442c93b1..aaeff426c7a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -26,12 +26,13 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.AvroOrcUtils;
 import org.apache.hudi.common.util.OrcReaderIterator;
-import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieBootstrapHandle;
 import org.apache.hudi.keygen.KeyGeneratorInterface;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.ExecutorFactory;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -69,19 +70,19 @@ class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
     if (config.getRecordMerger().getRecordType() == HoodieRecordType.SPARK) {
       throw new UnsupportedOperationException();
     }
-    BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null;
+    HoodieExecutor<Void> wrapper = null;
     Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf()));
     TypeDescription orcSchema = orcReader.getSchema();
     try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) {
-      wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
-          new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
-        String recKey = keyGenerator.getKey(inp).getRecordKey();
-        GenericRecord gr = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
-        gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
-        BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
-        HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
-        return rec;
-      }, table.getPreExecuteRunnable());
+      wrapper = ExecutorFactory.create(config, new OrcReaderIterator<GenericRecord>(reader, avroSchema, orcSchema),
+          new BootstrapRecordConsumer(bootstrapHandle), inp -> {
+            String recKey = keyGenerator.getKey(inp).getRecordKey();
+            GenericRecord gr = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
+            gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
+            BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
+            HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
+            return rec;
+        }, table.getPreExecuteRunnable());
       wrapper.execute();
     } catch (Exception e) {
       throw new HoodieException(e);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index e5944225750..02cef7761c1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -27,8 +27,9 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieBootstrapHandle;
@@ -36,6 +37,7 @@ import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.keygen.KeyGeneratorInterface;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.ExecutorFactory;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
@@ -76,7 +78,7 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
                                   KeyGeneratorInterface keyGenerator,
                                   String partitionPath,
                                   Schema schema) throws Exception {
-    BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
+    HoodieExecutor<Void> wrapper = null;
     HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
 
     HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType())
@@ -91,8 +93,9 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
             .copy();
       };
 
-      wrapper = new BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
-          reader.getRecordIterator(schema), new BootstrapRecordConsumer(bootstrapHandle), transformer, table.getPreExecuteRunnable());
+      ClosableIterator<HoodieRecord> recordIterator = reader.getRecordIterator(schema);
+      wrapper = ExecutorFactory.create(config, recordIterator,
+          new BootstrapRecordConsumer(bootstrapHandle), transformer, table.getPreExecuteRunnable());
 
       wrapper.execute();
     } catch (Exception e) {