You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/09/02 08:32:55 UTC
[hudi] branch master updated: [HUDI-2376] Add pipeline for Append
mode (#3573)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7a1bd22 [HUDI-2376] Add pipeline for Append mode (#3573)
7a1bd22 is described below
commit 7a1bd225cab6b1737d689d3659351a8c950a4d78
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Thu Sep 2 16:32:40 2021 +0800
[HUDI-2376] Add pipeline for Append mode (#3573)
Co-authored-by: 喻兆靖 <yu...@bilibili.com>
---
.../apache/hudi/client/HoodieFlinkWriteClient.java | 5 -
.../apache/hudi/configuration/FlinkOptions.java | 29 ++-
.../org/apache/hudi/sink/StreamWriteFunction.java | 194 +---------------
.../org/apache/hudi/sink/StreamWriteOperator.java | 28 +--
.../hudi/sink/append/AppendWriteFunction.java | 137 +++++++++++
.../AppendWriteOperator.java} | 30 ++-
.../hudi/sink/bulk/BulkInsertWriteFunction.java | 36 +--
.../hudi/sink/bulk/BulkInsertWriteOperator.java | 77 +-----
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 27 ++-
.../apache/hudi/sink/bulk/sort/SortOperator.java | 1 -
.../sink/common/AbstractStreamWriteFunction.java | 258 +++++++++++++++++++++
.../AbstractWriteFunction.java} | 32 ++-
.../AbstractWriteOperator.java} | 36 ++-
.../WriteOperatorFactory.java} | 19 +-
.../org/apache/hudi/sink/event/CommitAckEvent.java | 3 +-
.../partitioner/profile/DeltaWriteProfile.java | 2 +-
.../sink/partitioner/profile/WriteProfiles.java | 6 +-
.../sink/transform/RowDataToHoodieFunctions.java | 3 +-
.../apache/hudi/sink/transform/Transformer.java | 1 +
.../java/org/apache/hudi/sink/utils/Pipelines.java | 31 ++-
.../hudi/source/StreamReadMonitoringFunction.java | 20 +-
.../apache/hudi/streamer/FlinkStreamerConfig.java | 38 +--
.../org/apache/hudi/table/HoodieTableFactory.java | 4 +-
.../org/apache/hudi/table/HoodieTableSink.java | 9 +-
.../org/apache/hudi/table/HoodieTableSource.java | 3 +-
.../apache/hudi/table/format/FilePathUtils.java | 26 +--
.../table/format/cow/AbstractColumnReader.java | 3 +-
.../table/format/mor/MergeOnReadInputSplit.java | 20 +-
.../table/format/mor/MergeOnReadTableState.java | 1 -
.../org/apache/hudi/util/AvroSchemaConverter.java | 8 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 3 +
.../org/apache/hudi/sink/StreamWriteITCase.java | 4 -
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 24 +-
.../apache/hudi/sink/bulk/TestRowDataKeyGen.java | 2 +-
.../hudi/sink/utils/CompactFunctionWrapper.java | 14 +-
.../hudi/sink/utils/InsertFunctionWrapper.java | 141 +++++++++++
.../sink/utils/StreamWriteFunctionWrapper.java | 2 +-
.../apache/hudi/source/TestStreamReadOperator.java | 1 +
.../apache/hudi/table/HoodieDataSourceITCase.java | 35 ++-
.../org/apache/hudi/utils/TestConfigurations.java | 12 +-
.../test/java/org/apache/hudi/utils/TestSQL.java | 3 +-
.../org/apache/hudi/utils/TestStreamerUtil.java | 16 +-
42 files changed, 854 insertions(+), 490 deletions(-)
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 7b553bc..fdefd90 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -448,11 +448,6 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath();
- // Always use FlinkCreateHandle when insert duplication turns on
- if (config.allowDuplicateInserts()) {
- return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
- fileID, table.getTaskContextSupplier());
- }
if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 0e2b0b3..8504f6a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -222,10 +222,10 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
public static final ConfigOption<Boolean> INSERT_DEDUP = ConfigOptions
- .key("write.insert.deduplicate")
- .booleanType()
- .defaultValue(true)
- .withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true");
+ .key("write.insert.deduplicate")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true");
public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
@@ -370,6 +370,27 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(1024)
.withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB");
+ public static final ConfigOption<Integer> WRITE_PARQUET_BLOCK_SIZE = ConfigOptions
+ .key("write.parquet.block.size")
+ .intType()
+ .defaultValue(120)
+ .withDescription("Parquet RowGroup size. It's recommended to make this large enough that scan costs can be"
+ + " amortized by packing enough column values into a single row group.");
+
+ public static final ConfigOption<Integer> WRITE_PARQUET_MAX_FILE_SIZE = ConfigOptions
+ .key("write.parquet.max.file.size")
+ .intType()
+ .defaultValue(120)
+ .withDescription("Target size for parquet files produced by Hudi write phases. "
+ + "For DFS, this needs to be aligned with the underlying filesystem block size for optimal performance.");
+
+ public static final ConfigOption<Integer> WRITE_PARQUET_PAGE_SIZE = ConfigOptions
+ .key("hoodie.parquet.page.size")
+ .intType()
+ .defaultValue(1)
+ .withDescription("Parquet page size. Page is the unit of read within a parquet file. "
+ + "Within a block, pages are compressed separately.");
+
public static final ConfigOption<Integer> WRITE_MERGE_MAX_MEMORY = ConfigOptions
.key("write.merge.max_memory")
.intType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 71b20ba..a155fb5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -18,38 +18,26 @@
package org.apache.hudi.sink;
-import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.sink.event.CommitAckEvent;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +49,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -101,9 +88,7 @@ import java.util.stream.Collectors;
* @param <I> Type of the input record
* @see StreamWriteOperatorCoordinator
*/
-public class StreamWriteFunction<K, I, O>
- extends KeyedProcessFunction<K, I, O>
- implements CheckpointedFunction {
+public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
private static final long serialVersionUID = 1L;
@@ -114,76 +99,20 @@ public class StreamWriteFunction<K, I, O>
*/
private transient Map<String, DataBucket> buckets;
- /**
- * Config options.
- */
- private final Configuration config;
-
- /**
- * Id of current subtask.
- */
- private int taskID;
-
- /**
- * Write Client.
- */
- private transient HoodieFlinkWriteClient writeClient;
-
private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
/**
- * The REQUESTED instant we write the data.
- */
- private volatile String currentInstant;
-
- /**
- * Gateway to send operator events to the operator coordinator.
- */
- private transient OperatorEventGateway eventGateway;
-
- /**
- * Commit action type.
- */
- private transient String actionType;
-
- /**
* Total size tracer.
*/
private transient TotalSizeTracer tracer;
/**
- * Flag saying whether the write task is waiting for the checkpoint success notification
- * after it finished a checkpoint.
- *
- * <p>The flag is needed because the write task does not block during the waiting time interval,
- * some data buckets still flush out with old instant time. There are two cases that the flush may produce
- * corrupted files if the old instant is committed successfully:
- * 1) the write handle was writing data but interrupted, left a corrupted parquet file;
- * 2) the write handle finished the write but was not closed, left an empty parquet file.
- *
- * <p>To solve, when this flag was set to true, we block the data flushing thus the #processElement method,
- * the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant
- * time changed(the last instant committed successfully).
- */
- private volatile boolean confirming = false;
-
- /**
- * List state of the write metadata events.
- */
- private transient ListState<WriteMetadataEvent> writeMetadataState;
-
- /**
- * Write status list for the current checkpoint.
- */
- private List<WriteStatus> writeStatuses;
-
- /**
* Constructs a StreamingSinkFunction.
*
* @param config The config options
*/
public StreamWriteFunction(Configuration config) {
- this.config = config;
+ super(config);
}
@Override
@@ -194,42 +123,15 @@ public class StreamWriteFunction<K, I, O>
}
@Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- this.taskID = getRuntimeContext().getIndexOfThisSubtask();
- this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
- this.actionType = CommitUtils.getCommitActionType(
- WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
- HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
-
- this.writeStatuses = new ArrayList<>();
- this.writeMetadataState = context.getOperatorStateStore().getListState(
- new ListStateDescriptor<>(
- "write-metadata-state",
- TypeInformation.of(WriteMetadataEvent.class)
- ));
-
- this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
- if (context.isRestored()) {
- restoreWriteMetadata();
- } else {
- sendBootstrapEvent();
- }
- // blocks flushing until the coordinator starts a new instant
- this.confirming = true;
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ public void snapshotState() {
// Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity.
// wait for the buffer data flush out and request a new instant
flushRemaining(false);
- // Reload the snapshot state as the current state.
- reloadWriteMetaState();
}
@Override
- public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
+ public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {
bufferRecord((HoodieRecord<?>) value);
}
@@ -264,21 +166,6 @@ public class StreamWriteFunction<K, I, O>
return ret;
}
- @VisibleForTesting
- @SuppressWarnings("rawtypes")
- public HoodieFlinkWriteClient getWriteClient() {
- return writeClient;
- }
-
- @VisibleForTesting
- public boolean isConfirming() {
- return this.confirming;
- }
-
- public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
- this.eventGateway = operatorEventGateway;
- }
-
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@@ -307,49 +194,6 @@ public class StreamWriteFunction<K, I, O>
}
}
- private void restoreWriteMetadata() throws Exception {
- String lastInflight = this.writeClient.getLastPendingInstant(this.actionType);
- boolean eventSent = false;
- for (WriteMetadataEvent event : this.writeMetadataState.get()) {
- if (Objects.equals(lastInflight, event.getInstantTime())) {
- // The checkpoint succeed but the meta does not commit,
- // re-commit the inflight instant
- this.eventGateway.sendEventToCoordinator(event);
- LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
- eventSent = true;
- }
- }
- if (!eventSent) {
- sendBootstrapEvent();
- }
- }
-
- private void sendBootstrapEvent() {
- this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
- LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
- }
-
- /**
- * Reload the write metadata state as the current checkpoint.
- */
- private void reloadWriteMetaState() throws Exception {
- this.writeMetadataState.clear();
- WriteMetadataEvent event = WriteMetadataEvent.builder()
- .taskID(taskID)
- .instantTime(currentInstant)
- .writeStatus(new ArrayList<>(writeStatuses))
- .bootstrap(true)
- .build();
- this.writeMetadataState.add(event);
- writeStatuses.clear();
- }
-
- public void handleOperatorEvent(OperatorEvent event) {
- ValidationUtils.checkArgument(event instanceof CommitAckEvent,
- "The write function can only handle CommitAckEvent");
- this.confirming = false;
- }
-
/**
* Represents a data item in the buffer, this is needed to reduce the
* memory footprint.
@@ -562,32 +406,6 @@ public class StreamWriteFunction<K, I, O>
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}
- private String instantToWrite(boolean hasData) {
- String instant = this.writeClient.getLastPendingInstant(this.actionType);
- // if exactly-once semantics turns on,
- // waits for the checkpoint notification until the checkpoint timeout threshold hits.
- TimeWait timeWait = TimeWait.builder()
- .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
- .action("instant initialize")
- .build();
- while (confirming) {
- // wait condition:
- // 1. there is no inflight instant
- // 2. the inflight instant does not change and the checkpoint has buffering data
- if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
- // sleep for a while
- timeWait.waitFor();
- // refresh the inflight instant
- instant = this.writeClient.getLastPendingInstant(this.actionType);
- } else {
- // the inflight instant changed, which means the last instant was committed
- // successfully.
- confirming = false;
- }
- }
- return instant;
- }
-
@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
index c16743e..9e39e3f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
@@ -18,12 +18,10 @@
package org.apache.hudi.sink;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
/**
@@ -31,27 +29,13 @@ import org.apache.flink.streaming.api.operators.StreamSink;
*
* @param <I> The input type
*/
-public class StreamWriteOperator<I>
- extends KeyedProcessOperator<Object, I, Object>
- implements OperatorEventHandler, BoundedOneInput {
- private final StreamWriteFunction<Object, I, Object> sinkFunction;
+public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {
public StreamWriteOperator(Configuration conf) {
super(new StreamWriteFunction<>(conf));
- this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
- }
-
- @Override
- public void handleOperatorEvent(OperatorEvent event) {
- this.sinkFunction.handleOperatorEvent(event);
- }
-
- void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
- sinkFunction.setOperatorEventGateway(operatorEventGateway);
}
- @Override
- public void endInput() {
- sinkFunction.endInput();
+ public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
+ return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf));
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
new file mode 100644
index 0000000..128c030
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.append;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p>The function writes base files directly for each checkpoint,
+ * the file may roll over when it’s size hits the configured threshold.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Helper class for log mode.
+ */
+ private transient BulkInsertWriterHelper writerHelper;
+
+ /**
+ * Table row type.
+ */
+ private final RowType rowType;
+
+ /**
+ * Constructs an AppendWriteFunction.
+ *
+ * @param config The config options
+ */
+ public AppendWriteFunction(Configuration config, RowType rowType) {
+ super(config);
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void snapshotState() {
+ // Based on the fact that the coordinator starts the checkpoint first,
+ // it would check the validity.
+ // wait for the buffer data flush out and request a new instant
+ flushData(false);
+ // nullify the write helper for next ckp
+ this.writerHelper = null;
+ }
+
+ @Override
+ public void processElement(I value, Context ctx, Collector<Object> out) throws Exception {
+ if (this.writerHelper == null) {
+ initWriterHelper();
+ }
+ this.writerHelper.write((RowData) value);
+ }
+
+ @Override
+ public void close() {
+ if (this.writeClient != null) {
+ this.writeClient.cleanHandlesGracefully();
+ this.writeClient.close();
+ }
+ }
+
+ /**
+ * End input action for batch source.
+ */
+ public void endInput() {
+ flushData(true);
+ this.writeClient.cleanHandles();
+ this.writeStatuses.clear();
+ }
+
+ // -------------------------------------------------------------------------
+ // GetterSetter
+ // -------------------------------------------------------------------------
+ @VisibleForTesting
+ public BulkInsertWriterHelper getWriterHelper() {
+ return this.writerHelper;
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+ private void initWriterHelper() {
+ this.currentInstant = instantToWrite(true);
+ if (this.currentInstant == null) {
+ // in case there are empty checkpoints that has no input data
+ throw new HoodieException("No inflight instant when flushing data!");
+ }
+ this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+ this.currentInstant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
+ this.rowType);
+ }
+
+ private void flushData(boolean endInput) {
+ final List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
+ final WriteMetadataEvent event = WriteMetadataEvent.builder()
+ .taskID(taskID)
+ .instantTime(this.writerHelper.getInstantTime())
+ .writeStatus(writeStatus)
+ .lastBatch(true)
+ .endInput(endInput)
+ .build();
+ this.eventGateway.sendEventToCoordinator(event);
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteOperator.java
similarity index 54%
copy from hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
copy to hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteOperator.java
index 5b811a8..ad1a002 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteOperator.java
@@ -16,30 +16,26 @@
* limitations under the License.
*/
-package org.apache.hudi.sink.transform;
+package org.apache.hudi.sink.append;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
/**
- * Utilities for {@link RowDataToHoodieFunction}.
+ * Operator for {@link AppendWriteFunction}.
+ *
+ * @param <I> The input type
*/
-public abstract class RowDataToHoodieFunctions {
- private RowDataToHoodieFunctions() {}
+public class AppendWriteOperator<I> extends AbstractWriteOperator<I> {
+
+ public AppendWriteOperator(Configuration conf, RowType rowType) {
+ super(new AppendWriteFunction<>(conf, rowType));
+ }
- /**
- * Creates a {@link RowDataToHoodieFunction} instance based on the given configuration.
- */
- @SuppressWarnings("rawtypes")
- public static RowDataToHoodieFunction<RowData, HoodieRecord> create(RowType rowType, Configuration conf) {
- if (conf.getLong(FlinkOptions.WRITE_RATE_LIMIT) > 0) {
- return new RowDataToHoodieFunctionWithRateLimit<>(rowType, conf);
- } else {
- return new RowDataToHoodieFunction<>(rowType, conf);
- }
+ public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, RowType rowType) {
+ return WriteOperatorFactory.instance(conf, new AppendWriteOperator<>(conf, rowType));
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index dd0f7bc..7fce5c0 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -19,21 +19,20 @@
package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
@@ -43,7 +42,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Sink function to write the data to the underneath filesystem.
@@ -55,8 +53,8 @@ import java.util.stream.Collectors;
* @param <I> Type of the input record
* @see StreamWriteOperatorCoordinator
*/
-public class BulkInsertWriteFunction<I, O>
- extends ProcessFunction<I, O> {
+public class BulkInsertWriteFunction<I>
+ extends AbstractWriteFunction<I> {
private static final long serialVersionUID = 1L;
@@ -126,7 +124,7 @@ public class BulkInsertWriteFunction<I, O>
}
@Override
- public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
+ public void processElement(I value, Context ctx, Collector<Object> out) throws IOException {
this.writerHelper.write((RowData) value);
}
@@ -142,14 +140,8 @@ public class BulkInsertWriteFunction<I, O>
* End input action for batch source.
*/
public void endInput() {
- final List<WriteStatus> writeStatus;
- try {
- this.writerHelper.close();
- writeStatus = this.writerHelper.getWriteStatuses().stream()
- .map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList());
- } catch (IOException e) {
- throw new HoodieException("Error collect the write status for task [" + this.taskID + "]");
- }
+ final List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
+
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(this.writerHelper.getInstantTime())
@@ -160,17 +152,9 @@ public class BulkInsertWriteFunction<I, O>
this.eventGateway.sendEventToCoordinator(event);
}
- /**
- * Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}.
- */
- private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) {
- WriteStatus writeStatus = new WriteStatus(false, 0.1);
- writeStatus.setStat(internalWriteStatus.getStat());
- writeStatus.setFileId(internalWriteStatus.getFileId());
- writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
- writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
- writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
- return writeStatus;
+ @Override
+ public void handleOperatorEvent(OperatorEvent event) {
+ // no operation
}
// -------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java
index bc01622..16fb87f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java
@@ -18,24 +18,12 @@
package org.apache.hudi.sink.bulk;
-import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
-import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
/**
@@ -44,13 +32,11 @@ import org.apache.flink.table.types.logical.RowType;
* @param <I> The input type
*/
public class BulkInsertWriteOperator<I>
- extends ProcessOperator<I, Object>
- implements OperatorEventHandler, BoundedOneInput {
- private final BulkInsertWriteFunction<I, Object> sinkFunction;
+ extends AbstractWriteOperator<I>
+ implements BoundedOneInput {
public BulkInsertWriteOperator(Configuration conf, RowType rowType) {
super(new BulkInsertWriteFunction<>(conf, rowType));
- this.sinkFunction = (BulkInsertWriteFunction<I, Object>) getUserFunction();
}
@Override
@@ -58,58 +44,7 @@ public class BulkInsertWriteOperator<I>
// no operation
}
- void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
- sinkFunction.setOperatorEventGateway(operatorEventGateway);
- }
-
- @Override
- public void endInput() {
- sinkFunction.endInput();
- }
-
- public static OperatorFactory<RowData> getFactory(Configuration conf, RowType rowType) {
- return new OperatorFactory<>(conf, rowType);
- }
-
- // -------------------------------------------------------------------------
- // Inner Class
- // -------------------------------------------------------------------------
-
- public static class OperatorFactory<I>
- extends SimpleUdfStreamOperatorFactory<Object>
- implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
- private static final long serialVersionUID = 1L;
-
- private final BulkInsertWriteOperator<I> operator;
- private final Configuration conf;
-
- public OperatorFactory(Configuration conf, RowType rowType) {
- super(new BulkInsertWriteOperator<>(conf, rowType));
- this.operator = (BulkInsertWriteOperator<I>) getOperator();
- this.conf = conf;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
- final OperatorID operatorID = parameters.getStreamConfig().getOperatorID();
- final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
-
- this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID));
- this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
- this.operator.setProcessingTimeService(this.processingTimeService);
- eventDispatcher.registerEventHandler(operatorID, operator);
- return (T) operator;
- }
-
- @Override
- public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
- return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
- }
-
- @Override
- public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
- super.setProcessingTimeService(processingTimeService);
- }
+ public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, RowType rowType) {
+ return WriteOperatorFactory.instance(conf, new BulkInsertWriteOperator<>(conf, rowType));
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index fbe7678..e0cbab6 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -19,9 +19,11 @@
package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.table.HoodieTable;
@@ -39,6 +41,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
/**
* Helper class for bulk insert used by Flink.
@@ -101,7 +104,7 @@ public class BulkInsertWriterHelper {
}
}
- public List<HoodieInternalWriteStatus> getWriteStatuses() throws IOException {
+ public List<HoodieInternalWriteStatus> getHoodieWriteStatuses() throws IOException {
close();
return writeStatusList;
}
@@ -172,5 +175,27 @@ public class BulkInsertWriterHelper {
return new RowType(false, mergedFields);
}
+
+ public List<WriteStatus> getWriteStatuses(int taskID) {
+ try {
+ return getHoodieWriteStatuses().stream()
+ .map(BulkInsertWriterHelper::toWriteStatus).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new HoodieException("Error collect the write status for task [" + taskID + "]");
+ }
+ }
+
+ /**
+ * Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}.
+ */
+ private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) {
+ WriteStatus writeStatus = new WriteStatus(false, 0.1);
+ writeStatus.setStat(internalWriteStatus.getStat());
+ writeStatus.setFileId(internalWriteStatus.getFileId());
+ writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
+ writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
+ writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
+ return writeStatus;
+ }
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java
index 7be2b41..aa62240 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java
@@ -35,7 +35,6 @@ import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.MutableObjectIterator;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
new file mode 100644
index 0000000..654f0b8
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.common;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.CommitAckEvent;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base infrastructures for streaming writer function.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public abstract class AbstractStreamWriteFunction<I>
+ extends AbstractWriteFunction<I>
+ implements CheckpointedFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamWriteFunction.class);
+
+ /**
+ * Config options.
+ */
+ protected final Configuration config;
+
+ /**
+ * Id of current subtask.
+ */
+ protected int taskID;
+
+ /**
+ * Write Client.
+ */
+ protected transient HoodieFlinkWriteClient writeClient;
+
+ /**
+ * The REQUESTED instant we write the data.
+ */
+ protected volatile String currentInstant;
+
+ /**
+ * Gateway to send operator events to the operator coordinator.
+ */
+ protected transient OperatorEventGateway eventGateway;
+
+ /**
+ * Commit action type.
+ */
+ protected transient String actionType;
+
+ /**
+ * Flag saying whether the write task is waiting for the checkpoint success notification
+ * after it finished a checkpoint.
+ *
+ * <p>The flag is needed because the write task does not block during the waiting time interval,
+ * some data buckets still flush out with old instant time. There are two cases that the flush may produce
+ * corrupted files if the old instant is committed successfully:
+ * 1) the write handle was writing data but interrupted, left a corrupted parquet file;
+ * 2) the write handle finished the write but was not closed, left an empty parquet file.
+ *
+ * <p>To solve, when this flag was set to true, we block the data flushing thus the #processElement method,
+ * the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant
+ * time changed(the last instant committed successfully).
+ */
+ protected volatile boolean confirming = false;
+
+ /**
+ * List state of the write metadata events.
+ */
+ private transient ListState<WriteMetadataEvent> writeMetadataState;
+
+ /**
+ * Write status list for the current checkpoint.
+ */
+ protected List<WriteStatus> writeStatuses;
+
+ /**
+ * Constructs a StreamWriteFunctionBase.
+ *
+ * @param config The config options
+ */
+ public AbstractStreamWriteFunction(Configuration config) {
+ this.config = config;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
+ this.actionType = CommitUtils.getCommitActionType(
+ WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+ HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
+
+ this.writeStatuses = new ArrayList<>();
+ this.writeMetadataState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "write-metadata-state",
+ TypeInformation.of(WriteMetadataEvent.class)
+ ));
+
+ this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
+ if (context.isRestored()) {
+ restoreWriteMetadata();
+ } else {
+ sendBootstrapEvent();
+ }
+ // blocks flushing until the coordinator starts a new instant
+ this.confirming = true;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ snapshotState();
+ // Reload the snapshot state as the current state.
+ reloadWriteMetaState();
+ }
+
+ public abstract void snapshotState();
+
+ // -------------------------------------------------------------------------
+ // Getter/Setter
+ // -------------------------------------------------------------------------
+ @VisibleForTesting
+ @SuppressWarnings("rawtypes")
+ public HoodieFlinkWriteClient getWriteClient() {
+ return writeClient;
+ }
+
+ @VisibleForTesting
+ public boolean isConfirming() {
+ return this.confirming;
+ }
+
+ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+ this.eventGateway = operatorEventGateway;
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void restoreWriteMetadata() throws Exception {
+ String lastInflight = this.writeClient.getLastPendingInstant(this.actionType);
+ boolean eventSent = false;
+ for (WriteMetadataEvent event : this.writeMetadataState.get()) {
+ if (Objects.equals(lastInflight, event.getInstantTime())) {
+ // The checkpoint succeed but the meta does not commit,
+ // re-commit the inflight instant
+ this.eventGateway.sendEventToCoordinator(event);
+ LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
+ eventSent = true;
+ }
+ }
+ if (!eventSent) {
+ sendBootstrapEvent();
+ }
+ }
+
+ private void sendBootstrapEvent() {
+ this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
+ LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
+ }
+
+ /**
+ * Reload the write metadata state as the current checkpoint.
+ */
+ private void reloadWriteMetaState() throws Exception {
+ this.writeMetadataState.clear();
+ WriteMetadataEvent event = WriteMetadataEvent.builder()
+ .taskID(taskID)
+ .instantTime(currentInstant)
+ .writeStatus(new ArrayList<>(writeStatuses))
+ .bootstrap(true)
+ .build();
+ this.writeMetadataState.add(event);
+ writeStatuses.clear();
+ }
+
+ public void handleOperatorEvent(OperatorEvent event) {
+ ValidationUtils.checkArgument(event instanceof CommitAckEvent,
+ "The write function can only handle CommitAckEvent");
+ this.confirming = false;
+ }
+
+ /**
+ * Prepares the instant time to write with for next checkpoint.
+ *
+ * @param hasData Whether the task has buffering data
+ * @return The instant time
+ */
+ protected String instantToWrite(boolean hasData) {
+ String instant = this.writeClient.getLastPendingInstant(this.actionType);
+ // if exactly-once semantics turns on,
+ // waits for the checkpoint notification until the checkpoint timeout threshold hits.
+ TimeWait timeWait = TimeWait.builder()
+ .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
+ .action("instant initialize")
+ .build();
+ while (confirming) {
+ // wait condition:
+ // 1. there is no inflight instant
+ // 2. the inflight instant does not change and the checkpoint has buffering data
+ if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
+ // sleep for a while
+ timeWait.waitFor();
+ // refresh the inflight instant
+ instant = this.writeClient.getLastPendingInstant(this.actionType);
+ } else {
+ // the inflight instant changed, which means the last instant was committed
+ // successfully.
+ confirming = false;
+ }
+ }
+ return instant;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
similarity index 52%
copy from hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
copy to hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
index 93f74af..8e77600 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
@@ -16,22 +16,32 @@
* limitations under the License.
*/
-package org.apache.hudi.sink.event;
+package org.apache.hudi.sink.common;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
/**
- * An operator event to mark successful instant commit.
+ * Base class for write function.
+ *
+ * @param <I> the input type
*/
-public class CommitAckEvent implements OperatorEvent {
- private static final long serialVersionUID = 1L;
-
- private static final CommitAckEvent INSTANCE = new CommitAckEvent();
+public abstract class AbstractWriteFunction<I> extends ProcessFunction<I, Object> implements BoundedOneInput {
+ /**
+ * Sets up the event gateway.
+ */
+ public abstract void setOperatorEventGateway(OperatorEventGateway operatorEventGateway);
- // default constructor for efficient serialization
- public CommitAckEvent() {}
+ /**
+ * Invoked when bounded source ends up.
+ */
+ public abstract void endInput();
- public static CommitAckEvent getInstance() {
- return INSTANCE;
- }
+ /**
+ * Handles the operator event sent by the coordinator.
+ * @param event The event
+ */
+ public abstract void handleOperatorEvent(OperatorEvent event);
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
similarity index 58%
copy from hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
copy to hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
index c16743e..e339ccb 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
@@ -16,42 +16,40 @@
* limitations under the License.
*/
-package org.apache.hudi.sink;
+package org.apache.hudi.sink.common;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
-import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
/**
- * Operator for {@link StreamSink}.
+ * Base class for write operator.
*
- * @param <I> The input type
+ * @param <I> the input type
*/
-public class StreamWriteOperator<I>
- extends KeyedProcessOperator<Object, I, Object>
+public abstract class AbstractWriteOperator<I>
+ extends ProcessOperator<I, Object>
implements OperatorEventHandler, BoundedOneInput {
- private final StreamWriteFunction<Object, I, Object> sinkFunction;
+ private final AbstractWriteFunction<I> function;
- public StreamWriteOperator(Configuration conf) {
- super(new StreamWriteFunction<>(conf));
- this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
+ public AbstractWriteOperator(AbstractWriteFunction<I> function) {
+ super(function);
+ this.function = function;
}
- @Override
- public void handleOperatorEvent(OperatorEvent event) {
- this.sinkFunction.handleOperatorEvent(event);
+ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+ this.function.setOperatorEventGateway(operatorEventGateway);
}
- void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
- sinkFunction.setOperatorEventGateway(operatorEventGateway);
+ @Override
+ public void endInput() {
+ this.function.endInput();
}
@Override
- public void endInput() {
- sinkFunction.endInput();
+ public void handleOperatorEvent(OperatorEvent evt) {
+ this.function.handleOperatorEvent(evt);
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/WriteOperatorFactory.java
similarity index 83%
rename from hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java
rename to hudi-flink/src/main/java/org/apache/hudi/sink/common/WriteOperatorFactory.java
index ce89886..01a28de 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/WriteOperatorFactory.java
@@ -16,7 +16,10 @@
* limitations under the License.
*/
-package org.apache.hudi.sink;
+package org.apache.hudi.sink.common;
+
+import org.apache.hudi.sink.StreamWriteOperator;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -31,20 +34,24 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
/**
* Factory class for {@link StreamWriteOperator}.
*/
-public class StreamWriteOperatorFactory<I>
+public class WriteOperatorFactory<I>
extends SimpleUdfStreamOperatorFactory<Object>
implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
private static final long serialVersionUID = 1L;
- private final StreamWriteOperator<I> operator;
+ private final AbstractWriteOperator<I> operator;
private final Configuration conf;
- public StreamWriteOperatorFactory(Configuration conf) {
- super(new StreamWriteOperator<>(conf));
- this.operator = (StreamWriteOperator<I>) getOperator();
+ public WriteOperatorFactory(Configuration conf, AbstractWriteOperator<I> operator) {
+ super(operator);
+ this.operator = operator;
this.conf = conf;
}
+ public static <I> WriteOperatorFactory<I> instance(Configuration conf, AbstractWriteOperator<I> operator) {
+ return new WriteOperatorFactory<>(conf, operator);
+ }
+
@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
index 93f74af..541fd06 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
@@ -29,7 +29,8 @@ public class CommitAckEvent implements OperatorEvent {
private static final CommitAckEvent INSTANCE = new CommitAckEvent();
// default constructor for efficient serialization
- public CommitAckEvent() {}
+ public CommitAckEvent() {
+ }
public static CommitAckEvent getInstance() {
return INSTANCE;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index 9f56bdd..6b5e96e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -61,7 +61,7 @@ public class DeltaWriteProfile extends WriteProfile {
// If we can index log files, we can add more inserts to log files for fileIds including those under
// pending compaction.
List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) {
allSmallFileSlices.add(fileSlice);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 1fcf3f1..0ab8f12 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -52,7 +52,8 @@ public class WriteProfiles {
private static final Map<String, WriteProfile> PROFILES = new HashMap<>();
- private WriteProfiles() {}
+ private WriteProfiles() {
+ }
public static synchronized WriteProfile singleton(
boolean ignoreSmallFiles,
@@ -104,7 +105,6 @@ public class WriteProfiles {
* @param basePath Table base path
* @param metadata The metadata
* @param fs The filesystem
- *
* @return the commit file status list
*/
private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) {
@@ -143,7 +143,6 @@ public class WriteProfiles {
* @param basePath The table base path
* @param instant The hoodie instant
* @param timeline The timeline
- *
* @return the commit metadata or empty if any error occurs
*/
public static Option<HoodieCommitMetadata> getCommitMetadataSafely(
@@ -172,7 +171,6 @@ public class WriteProfiles {
* @param basePath The table base path
* @param instant The hoodie instant
* @param timeline The timeline
- *
* @return the commit metadata
*/
public static HoodieCommitMetadata getCommitMetadata(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
index 5b811a8..0007fd1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
@@ -29,7 +29,8 @@ import org.apache.flink.table.types.logical.RowType;
* Utilities for {@link RowDataToHoodieFunction}.
*/
public abstract class RowDataToHoodieFunctions {
- private RowDataToHoodieFunctions() {}
+ private RowDataToHoodieFunctions() {
+ }
/**
* Creates a {@link RowDataToHoodieFunction} instance based on the given configuration.
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
index f40a838..282cca7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
@@ -28,6 +28,7 @@ public interface Transformer {
/**
* Transform source DataStream to target DataStream.
+ *
* @param source
*/
DataStream<RowData> apply(DataStream<RowData> source);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index f31645c..1211188 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -21,12 +21,14 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
-import org.apache.hudi.sink.StreamWriteOperatorFactory;
+import org.apache.hudi.sink.StreamWriteOperator;
+import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -41,6 +43,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -52,7 +55,7 @@ import org.apache.flink.table.types.logical.RowType;
public class Pipelines {
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
- BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
+ WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
if (partitionFields.length > 0) {
@@ -80,9 +83,17 @@ public class Pipelines {
operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
- .addSink(new CleanFunction<>(conf))
- .setParallelism(1)
- .name("clean_commits");
+ .addSink(DummySink.INSTANCE);
+ }
+
+ public static DataStreamSink<Object> append(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
+ WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
+
+ return dataStream
+ .transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory)
+ .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+ .addSink(DummySink.INSTANCE);
}
public static DataStream<HoodieRecord> bootstrap(
@@ -143,7 +154,7 @@ public class Pipelines {
}
public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
- StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
+ WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
@@ -180,4 +191,12 @@ public class Pipelines {
.setParallelism(1)
.name("clean_commits");
}
+
+ /**
+ * Dummy sink that does nothing.
+ */
+ public static class DummySink implements SinkFunction<Object> {
+ private static final long serialVersionUID = 1L;
+ public static DummySink INSTANCE = new DummySink();
+ }
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index ae6b3e1..112dfda 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -270,16 +270,16 @@ public class StreamReadMonitoringFunction
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
- .map(fileSlice -> {
- Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString())
- .collect(Collectors.toList()));
- String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
- return new MergeOnReadInputSplit(cnt.getAndAdd(1),
- basePath, logPaths, commitToIssue,
- metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
- }).collect(Collectors.toList()))
+ .map(fileSlice -> {
+ Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ .collect(Collectors.toList()));
+ String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ return new MergeOnReadInputSplit(cnt.getAndAdd(1),
+ basePath, logPaths, commitToIssue,
+ metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
+ }).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 75272ab..3dc8cb2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -142,22 +142,22 @@ public class FlinkStreamerConfig extends Configuration {
public Integer writeTaskNum = 4;
@Parameter(names = {"--partition-default-name"},
- description = "The default partition name in case the dynamic partition column value is null/empty string")
+ description = "The default partition name in case the dynamic partition column value is null/empty string")
public String partitionDefaultName = "__DEFAULT_PARTITION__";
@Parameter(names = {"--index-bootstrap-enabled"},
- description = "Whether to bootstrap the index state from existing hoodie table, default false")
+ description = "Whether to bootstrap the index state from existing hoodie table, default false")
public Boolean indexBootstrapEnabled = false;
@Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default 1.5 day")
public Double indexStateTtl = 1.5D;
@Parameter(names = {"--index-global-enabled"}, description = "Whether to update index for the old partition path "
- + "if same key record with different partition path came in, default false")
+ + "if same key record with different partition path came in, default false")
public Boolean indexGlobalEnabled = false;
@Parameter(names = {"--index-partition-regex"},
- description = "Whether to load partitions in state if partition path matching, default *")
+ description = "Whether to load partitions in state if partition path matching, default *")
public String indexPartitionRegex = ".*";
@Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization")
@@ -167,8 +167,8 @@ public class FlinkStreamerConfig extends Configuration {
public String sourceAvroSchema = "";
@Parameter(names = {"--utc-timezone"}, description = "Use UTC timezone or local timezone to the conversion between epoch"
- + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
- + " use UTC timezone, by default true")
+ + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
+ + " use UTC timezone, by default true")
public Boolean utcTimezone = true;
@Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
@@ -180,18 +180,18 @@ public class FlinkStreamerConfig extends Configuration {
public Boolean hiveStylePartitioning = false;
@Parameter(names = {"--write-task-max-size"}, description = "Maximum memory in MB for a write task, when the threshold hits,\n"
- + "it flushes the max size data bucket to avoid OOM, default 1GB")
+ + "it flushes the max size data bucket to avoid OOM, default 1GB")
public Double writeTaskMaxSize = 1024D;
@Parameter(names = {"--write-batch-size"},
- description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB")
+ description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB")
public Double writeBatchSize = 64D;
@Parameter(names = {"--write-log-block-size"}, description = "Max log block size in MB for log file, default 128MB")
public Integer writeLogBlockSize = 128;
@Parameter(names = {"--write-log-max-size"},
- description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB")
+ description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB")
public Integer writeLogMaxSize = 1024;
@Parameter(names = {"--write-merge-max-memory"}, description = "Max memory in MB for merge, default 100MB")
@@ -204,11 +204,11 @@ public class FlinkStreamerConfig extends Configuration {
public Integer compactionTasks = 10;
@Parameter(names = {"--compaction-trigger-strategy"},
- description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
- + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
- + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
- + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
- + "Default is 'num_commits'")
+ description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
+ + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
+ + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
+ + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
+ + "Default is 'num_commits'")
public String compactionTriggerStrategy = FlinkOptions.NUM_COMMITS;
@Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits")
@@ -227,16 +227,16 @@ public class FlinkStreamerConfig extends Configuration {
public Boolean cleanAsyncEnabled = true;
@Parameter(names = {"--clean-retain-commits"},
- description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10")
public Integer cleanRetainCommits = 10;
@Parameter(names = {"--archive-max-commits"},
- description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
+ description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
public Integer archiveMaxCommits = 30;
@Parameter(names = {"--archive-min-commits"},
- description = "Min number of commits to keep before archiving older commits into a sequential log, default 20")
+ description = "Min number of commits to keep before archiving older commits into a sequential log, default 20")
public Integer archiveMinCommits = 20;
@Parameter(names = {"--hive-sync-enable"}, description = "Asynchronously sync Hive meta to HMS, default false")
@@ -270,7 +270,7 @@ public class FlinkStreamerConfig extends Configuration {
public String hiveSyncPartitionFields = "";
@Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, "
- + "default 'SlashEncodedDayPartitionValueExtractor'")
+ + "default 'SlashEncodedDayPartitionValueExtractor'")
public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
@@ -289,7 +289,7 @@ public class FlinkStreamerConfig extends Configuration {
public Boolean hiveSyncSkipRoSuffix = false;
@Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
- + "Disabled by default for backward compatibility.")
+ + "Disabled by default for backward compatibility.")
public Boolean hiveSyncSupportTimestamp = false;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 89f7ed7..0344857 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -105,7 +105,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
/**
* The sanity check.
*
- * @param conf The table options
+ * @param conf The table options
* @param schema The table schema
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
@@ -217,7 +217,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
/**
* Sets up the hive options from the table definition.
- * */
+ */
private static void setupHiveOptions(Configuration conf) {
if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
&& FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 2ced22a..2fdd0fd 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -77,10 +77,17 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism();
+
+ DataStream<Object> pipeline;
+ // Append mode
+ if (StreamerUtil.allowDuplicateInserts(conf)) {
+ return Pipelines.append(conf, rowType, dataStream);
+ }
+
// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
// write pipeline
- DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
+ pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
// compaction
if (StreamerUtil.needsAsyncCompaction(conf)) {
return Pipelines.compact(conf, pipeline);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 52cb765..78d1db6 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -303,7 +303,8 @@ public class HoodieTableSource implements
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
- }).collect(Collectors.toList()); })
+ }).collect(Collectors.toList());
+ })
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
index 8f1347f..1eb7e2d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
@@ -283,11 +283,11 @@ public class FilePathUtils {
*
* <p>The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}].
*
- * @param path The base path
- * @param hadoopConf The hadoop configuration
- * @param partitionKeys The partition key list
+ * @param path The base path
+ * @param hadoopConf The hadoop configuration
+ * @param partitionKeys The partition key list
* @param defaultParName The default partition name for nulls
- * @param hivePartition Whether the partition path is in Hive style
+ * @param hivePartition Whether the partition path is in Hive style
*/
public static List<Map<String, String>> getPartitions(
Path path,
@@ -338,9 +338,9 @@ public class FilePathUtils {
/**
* Returns all the file paths that is the parents of the data files.
*
- * @param path The base path
- * @param conf The Flink configuration
- * @param hadoopConf The hadoop configuration
+ * @param path The base path
+ * @param conf The Flink configuration
+ * @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
*/
public static Path[] getReadPaths(
@@ -362,11 +362,10 @@ public class FilePathUtils {
/**
* Transforms the given partition key value mapping to read paths.
*
- * @param path The base path
- * @param partitionKeys The partition key list
+ * @param path The base path
+ * @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
- * @param hivePartition Whether the partition path is in Hive style
- *
+ * @param hivePartition Whether the partition path is in Hive style
* @see #getReadPaths
*/
public static Path[] partitionPath2ReadPath(
@@ -384,10 +383,9 @@ public class FilePathUtils {
/**
* Transforms the given partition key value mapping to relative partition paths.
*
- * @param partitionKeys The partition key list
+ * @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
- * @param hivePartition Whether the partition path is in Hive style
- *
+ * @param hivePartition Whether the partition path is in Hive style
* @see #getReadPaths
*/
public static Set<String> toRelativePartitionPaths(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java
index e6f40a5..efbe914 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java
@@ -296,7 +296,8 @@ public abstract class AbstractColumnReader<V extends WritableColumnVector>
/**
* After read a page, we may need some initialization.
*/
- protected void afterReadPage() {}
+ protected void afterReadPage() {
+ }
/**
* Support lazy dictionary ids decode. See more in {@link ParquetDictionary}.
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index b929e7b..0c93eea 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -97,15 +97,15 @@ public class MergeOnReadInputSplit implements InputSplit {
@Override
public String toString() {
return "MergeOnReadInputSplit{"
- + "splitNum=" + splitNum
- + ", basePath=" + basePath
- + ", logPaths=" + logPaths
- + ", latestCommit='" + latestCommit + '\''
- + ", tablePath='" + tablePath + '\''
- + ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes
- + ", mergeType='" + mergeType + '\''
- + ", instantRange=" + instantRange
- + '}';
+ + "splitNum=" + splitNum
+ + ", basePath=" + basePath
+ + ", logPaths=" + logPaths
+ + ", latestCommit='" + latestCommit + '\''
+ + ", tablePath='" + tablePath + '\''
+ + ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes
+ + ", mergeType='" + mergeType + '\''
+ + ", instantRange=" + instantRange
+ + '}';
}
-
+
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 0a63c91..36dfecb 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -106,7 +106,6 @@ public class MergeOnReadTableState implements Serializable {
*
* @param pkOffsets the pk offsets in required row type
* @return pk field logical types
- *
* @see #getPkOffsetsInRequired()
*/
public LogicalType[] getPkTypes(int[] pkOffsets) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
index db72cc1..df815a8 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -162,7 +162,7 @@ public class AvroSchemaConverter {
* <p>Use "record" as the type name.
*
* @param schema the schema type, usually it should be the top level record type, e.g. not a
- * nested type
+ * nested type
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(LogicalType schema) {
@@ -176,7 +176,7 @@ public class AvroSchemaConverter {
* schema. Nested record type that only differs with type name is still compatible.
*
* @param logicalType logical type
- * @param rowName the record name
+ * @param rowName the record name
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(LogicalType logicalType, String rowName) {
@@ -315,7 +315,9 @@ public class AvroSchemaConverter {
return valueType;
}
- /** Returns schema with nullable true. */
+ /**
+ * Returns schema with nullable true.
+ */
private static Schema nullableSchema(Schema schema) {
return schema.isNullable()
? schema
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 9f625ba..b95c9e1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -181,6 +181,9 @@ public class StreamerUtil {
.withStorageConfig(HoodieStorageConfig.newBuilder()
.logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
.logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
+ .parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 * 1024)
+ .parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 1024)
+ .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 659e022..1890d07 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -245,8 +245,6 @@ public class StreamWriteITCase extends TestLogger {
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
- StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
- new StreamWriteOperatorFactory<>(conf);
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
@@ -302,8 +300,6 @@ public class StreamWriteITCase extends TestLogger {
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
- StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
- new StreamWriteOperatorFactory<>(conf);
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 0c52241..b403f3c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -23,13 +23,13 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.InsertFunctionWrapper;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -58,6 +58,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -532,11 +533,7 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertAllowsDuplication() throws Exception {
- // reset the config option
- conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
- conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
- conf.setBoolean(FlinkOptions.INSERT_DEDUP, false);
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
+ InsertFunctionWrapper<RowData> funcWrapper = new InsertFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
@@ -547,19 +544,16 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
- Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
- assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+ assertNull(funcWrapper.getWriterHelper());
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
- final OperatorEvent event2 = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
+ assertThat("The operator expect to send an event", event1, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
- funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient()
- .getLastPendingInstant(getTableType());
+ .getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1);
@@ -585,10 +579,8 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(2);
- final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
- final OperatorEvent event4 = funcWrapper.getNextEvent();
- funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
- funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
+ final OperatorEvent event2 = funcWrapper.getNextEvent(); // remove the first event first
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
funcWrapper.checkpointComplete(2);
// same with the original base file content.
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
index 32ee725..7a1eaee 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
@@ -77,7 +77,7 @@ public class TestRowDataKeyGen {
assertThat(keyGen1.getPartitionPath(rowData1), is("par1/1970-01-01T00:00:00.001"));
// null record key and partition path
- final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE,null, null, 23, null, null);
+ final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, null, 23, null, null);
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
assertThat(keyGen1.getPartitionPath(rowData2), is("default/default"));
// empty record key and partition path
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index d53d58e..fe2ddad 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -53,11 +53,17 @@ public class CompactFunctionWrapper {
private final IOManager ioManager;
private final StreamingRuntimeContext runtimeContext;
- /** Function that generates the {@link HoodieCompactionPlan}. */
+ /**
+ * Function that generates the {@link HoodieCompactionPlan}.
+ */
private CompactionPlanOperator compactionPlanOperator;
- /** Function that executes the compaction task. */
+ /**
+ * Function that executes the compaction task.
+ */
private CompactFunction compactFunction;
- /** Stream sink to handle compaction commits. */
+ /**
+ * Stream sink to handle compaction commits.
+ */
private CompactionCommitSink commitSink;
public CompactFunctionWrapper(Configuration conf) throws Exception {
@@ -120,7 +126,7 @@ public class CompactFunctionWrapper {
compactionPlanOperator.notifyCheckpointComplete(checkpointID);
// collect the CompactCommitEvents
List<CompactionCommitEvent> compactCommitEvents = new ArrayList<>();
- for (CompactionPlanEvent event: events) {
+ for (CompactionPlanEvent event : events) {
compactFunction.processElement(event, null, new Collector<CompactionCommitEvent>() {
@Override
public void collect(CompactionCommitEvent event) {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
new file mode 100644
index 0000000..ed23754
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.append.AppendWriteFunction;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class to manipulate the {@link AppendWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class InsertFunctionWrapper<I> {
+ private final Configuration conf;
+ private final RowType rowType;
+
+ private final StreamingRuntimeContext runtimeContext;
+ private final MockOperatorEventGateway gateway;
+ private final MockOperatorCoordinatorContext coordinatorContext;
+ private final StreamWriteOperatorCoordinator coordinator;
+ private final MockStateInitializationContext stateInitializationContext;
+
+ /**
+ * Append write function.
+ */
+ private AppendWriteFunction<RowData> writeFunction;
+
+ public InsertFunctionWrapper(String tablePath, Configuration conf) {
+ IOManager ioManager = new IOManagerAsync();
+ MockEnvironment environment = new MockEnvironmentBuilder()
+ .setTaskName("mockTask")
+ .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+ .setIOManager(ioManager)
+ .build();
+ this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+ this.gateway = new MockOperatorEventGateway();
+ this.conf = conf;
+ this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
+ // one function
+ this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
+ this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
+ this.stateInitializationContext = new MockStateInitializationContext();
+ }
+
+ public void openFunction() throws Exception {
+ this.coordinator.start();
+ this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
+
+ setupWriteFunction();
+ }
+
+ public void invoke(I record) throws Exception {
+ writeFunction.processElement((RowData) record, null, null);
+ }
+
+ public WriteMetadataEvent[] getEventBuffer() {
+ return this.coordinator.getEventBuffer();
+ }
+
+ public OperatorEvent getNextEvent() {
+ return this.gateway.getNextEvent();
+ }
+
+ @SuppressWarnings("rawtypes")
+ public HoodieFlinkWriteClient getWriteClient() {
+ return this.writeFunction.getWriteClient();
+ }
+
+ public void checkpointFunction(long checkpointId) throws Exception {
+ // checkpoint the coordinator first
+ this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
+
+ writeFunction.snapshotState(null);
+ stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+ }
+
+ public void checkpointComplete(long checkpointId) {
+ stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+ coordinator.notifyCheckpointComplete(checkpointId);
+ }
+
+ public StreamWriteOperatorCoordinator getCoordinator() {
+ return coordinator;
+ }
+
+ public BulkInsertWriterHelper getWriterHelper() {
+ return this.writeFunction.getWriterHelper();
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void setupWriteFunction() throws Exception {
+ writeFunction = new AppendWriteFunction<>(conf, rowType);
+ writeFunction.setRuntimeContext(runtimeContext);
+ writeFunction.setOperatorEventGateway(gateway);
+ writeFunction.initializeState(this.stateInitializationContext);
+ writeFunction.open(conf);
+
+ // handle the bootstrap event
+ coordinator.handleEventFromOperator(0, getNextEvent());
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 4ada517..6b6bede 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -95,7 +95,7 @@ public class StreamWriteFunctionWrapper<I> {
/**
* Stream write function.
*/
- private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction;
+ private StreamWriteFunction<HoodieRecord<?>> writeFunction;
private CompactFunctionWrapper compactFunctionWrapper;
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 8096d5e..233e6fa 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -68,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class TestStreamReadOperator {
private static final Map<String, String> EXPECTED = new HashMap<>();
+
static {
EXPECTED.put("par1", "+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]");
EXPECTED.put("par2", "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2]");
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index f8fc42a..a04e7bb 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -796,6 +796,35 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
+ "+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3);
}
+ @Test
+ void testAppendWrite() {
+ TableEnvironment tableEnv = batchTableEnv;
+ // csv source
+ String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data");
+ tableEnv.executeSql(csvSourceDDL);
+
+ String hoodieTableDDL = sql("hoodie_sink")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.OPERATION, "insert")
+ .option(FlinkOptions.INSERT_DEDUP, false)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ String insertInto = "insert into hoodie_sink select * from csv_source";
+ execInsertSql(tableEnv, insertInto);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from hoodie_sink").execute().collect());
+ assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+ // apply filters
+ List<Row> result2 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect());
+ assertRowsEquals(result2, "["
+ + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@@ -874,7 +903,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout)
- throws InterruptedException {
+ throws InterruptedException {
tEnv.executeSql("DROP TABLE IF EXISTS sink");
tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql("insert into sink " + select);
@@ -883,7 +912,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
tableResult.getJobClient().ifPresent(JobClient::cancel);
tEnv.executeSql("DROP TABLE IF EXISTS sink");
return CollectSinkTableFactory.RESULT.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
}
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 6824090..231d42c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -128,18 +128,18 @@ public class TestConfigurations {
DataType[] fieldTypes = tableSchema.getFieldDataTypes();
for (int i = 0; i < fieldNames.length; i++) {
builder.append(" `")
- .append(fieldNames[i])
- .append("` ")
- .append(fieldTypes[i].toString());
+ .append(fieldNames[i])
+ .append("` ")
+ .append(fieldTypes[i].toString());
if (i != fieldNames.length - 1) {
builder.append(",");
}
builder.append("\n");
}
final String withProps = ""
- + ") with (\n"
- + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
- + ")";
+ + ") with (\n"
+ + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
+ + ")";
builder.append(withProps);
return builder.toString();
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 6b0b71c..8822a6f 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -22,7 +22,8 @@ package org.apache.hudi.utils;
* Test sql statements.
*/
public class TestSQL {
- private TestSQL() {}
+ private TestSQL() {
+ }
public static final String INSERT_T1 = "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index 7459f3a..af40a8d 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -57,12 +57,12 @@ public class TestStreamerUtil {
// Validate the partition fields & preCombineField in hoodie.properties.
HoodieTableMetaClient metaClient1 = HoodieTableMetaClient.builder()
- .setBasePath(tempFile.getAbsolutePath())
- .setConf(new org.apache.hadoop.conf.Configuration())
- .build();
+ .setBasePath(tempFile.getAbsolutePath())
+ .setConf(new org.apache.hadoop.conf.Configuration())
+ .build();
assertTrue(metaClient1.getTableConfig().getPartitionFields().isPresent(),
- "Missing partition columns in the hoodie.properties.");
- assertArrayEquals(metaClient1.getTableConfig().getPartitionFields().get(), new String[] { "p0", "p1" });
+ "Missing partition columns in the hoodie.properties.");
+ assertArrayEquals(metaClient1.getTableConfig().getPartitionFields().get(), new String[] {"p0", "p1"});
assertEquals(metaClient1.getTableConfig().getPreCombineField(), "ts");
// Test for non-partitioned table.
@@ -70,9 +70,9 @@ public class TestStreamerUtil {
FileIOUtils.deleteDirectory(tempFile);
StreamerUtil.initTableIfNotExists(conf);
HoodieTableMetaClient metaClient2 = HoodieTableMetaClient.builder()
- .setBasePath(tempFile.getAbsolutePath())
- .setConf(new org.apache.hadoop.conf.Configuration())
- .build();
+ .setBasePath(tempFile.getAbsolutePath())
+ .setConf(new org.apache.hadoop.conf.Configuration())
+ .build();
assertFalse(metaClient2.getTableConfig().getPartitionFields().isPresent());
}