You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/13 12:03:39 UTC

[hudi] 02/03: [HUDI-3024] Add explicit write handler for flink (#4329)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c3fca6d6b5dea09b9581438c89fa5188ecd8f2d2
Author: WangMinChao <33...@users.noreply.github.com>
AuthorDate: Wed Dec 15 20:16:48 2021 +0800

    [HUDI-3024] Add explicit write handler for flink (#4329)
    
    Co-authored-by: wangminchao <wa...@asinking.com>
---
 .../hudi/execution/ExplicitWriteHandler.java       | 65 ++++++++++++++++++++++
 .../hudi/execution/FlinkLazyInsertIterable.java    | 24 ++++----
 .../apache/hudi/io/ExplicitWriteHandleFactory.java |  6 +-
 .../java/org/apache/hudi/io/FlinkAppendHandle.java |  5 ++
 4 files changed, 86 insertions(+), 14 deletions(-)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
new file mode 100644
index 0000000..46eff58
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.io.HoodieWriteHandle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Consumes stream of hoodie records from in-memory queue and writes to one explicit create handle.
+ */
+public class ExplicitWriteHandler<T extends HoodieRecordPayload>
+    extends BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
+
+  private final List<WriteStatus> statuses = new ArrayList<>();
+
+  private HoodieWriteHandle handle;
+
+  public ExplicitWriteHandler(HoodieWriteHandle handle) {
+    this.handle = handle;
+  }
+
+  @Override
+  public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) {
+    final HoodieRecord insertPayload = payload.record;
+    handle.write(insertPayload, payload.insertValue, payload.exception);
+  }
+
+  @Override
+  public void finish() {
+    closeOpenHandle();
+    assert statuses.size() > 0;
+  }
+
+  @Override
+  public List<WriteStatus> getResult() {
+    return statuses;
+  }
+
+  private void closeOpenHandle() {
+    statuses.addAll(handle.close());
+  }
+}
+
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index b0674b2..78b3cb1 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -27,7 +27,8 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.io.ExplicitWriteHandleFactory;
+import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
@@ -36,15 +37,6 @@ import java.util.Iterator;
 import java.util.List;
 
 public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> {
-  public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
-                                 boolean areRecordsSorted,
-                                 HoodieWriteConfig config,
-                                 String instantTime,
-                                 HoodieTable hoodieTable,
-                                 String idPrefix,
-                                 TaskContextSupplier taskContextSupplier) {
-    super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier);
-  }
 
   public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
                                  boolean areRecordsSorted,
@@ -53,7 +45,7 @@ public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
                                  HoodieTable hoodieTable,
                                  String idPrefix,
                                  TaskContextSupplier taskContextSupplier,
-                                 WriteHandleFactory writeHandleFactory) {
+                                 ExplicitWriteHandleFactory writeHandleFactory) {
     super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory);
   }
 
@@ -64,8 +56,8 @@ public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
         null;
     try {
       final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
-      bufferedIteratorExecutor =
-          new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema, hoodieConfig));
+      bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr),
+          Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig));
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
       return result;
@@ -77,4 +69,10 @@ public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
       }
     }
   }
+
+  @SuppressWarnings("rawtypes")
+  private ExplicitWriteHandler getExplicitInsertHandler() {
+    HoodieWriteHandle handle = ((ExplicitWriteHandleFactory) writeHandleFactory).getWriteHandle();
+    return new ExplicitWriteHandler(handle);
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
index 092e945..e598a03 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
@@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable;
  */
 public class ExplicitWriteHandleFactory<T extends HoodieRecordPayload, I, K, O>
     extends WriteHandleFactory<T, I, K, O> {
-  private HoodieWriteHandle<T, I, K, O> writeHandle;
+  private final HoodieWriteHandle<T, I, K, O> writeHandle;
 
   public ExplicitWriteHandleFactory(HoodieWriteHandle<T, I, K, O> writeHandle) {
     this.writeHandle = writeHandle;
@@ -41,4 +41,8 @@ public class ExplicitWriteHandleFactory<T extends HoodieRecordPayload, I, K, O>
       String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
     return writeHandle;
   }
+
+  public HoodieWriteHandle<T, I, K, O> getWriteHandle() {
+    return writeHandle;
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 1872637..b514896 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -75,6 +75,11 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
   }
 
   @Override
+  public boolean canWrite(HoodieRecord record) {
+    return true;
+  }
+
+  @Override
   protected boolean needsUpdateLocation() {
     return false;
   }