You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/28 00:53:25 UTC
[hudi] branch master updated: [HUDI-1522] Add a new pipeline for
Flink writer (#2430)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bc0325f [HUDI-1522] Add a new pipeline for Flink writer (#2430)
bc0325f is described below
commit bc0325f6ea0a734f106f21a2fcd4ead413a6cf7b
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Jan 28 08:53:13 2021 +0800
[HUDI-1522] Add a new pipeline for Flink writer (#2430)
* [HUDI-1522] Add a new pipeline for Flink writer
---
.../apache/hudi/client/HoodieFlinkWriteClient.java | 34 ++
.../hudi/index/state/FlinkInMemoryStateIndex.java | 4 +-
.../src/main/avro/HoodieRollbackMetadata.avsc | 4 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 11 +-
.../model/OverwriteWithLatestAvroPayload.java | 9 +-
.../common/testutils/HoodieTestDataGenerator.java | 4 +-
hudi-flink/pom.xml | 103 +++--
.../java/org/apache/hudi/HoodieFlinkStreamer.java | 198 ----------
.../org/apache/hudi/operator/FlinkOptions.java | 249 ++++++++++++
.../hudi/operator/InstantGenerateOperator.java | 10 +-
.../hudi/operator/KeyedWriteProcessFunction.java | 6 +-
.../apache/hudi/operator/StreamWriteFunction.java | 313 +++++++++++++++
.../apache/hudi/operator/StreamWriteOperator.java | 52 +++
.../operator/StreamWriteOperatorCoordinator.java | 419 +++++++++++++++++++++
.../hudi/operator/StreamWriteOperatorFactory.java | 77 ++++
.../operator/event/BatchWriteSuccessEvent.java | 57 +++
.../hudi/schema/FilebasedSchemaProvider.java | 17 +-
.../org/apache/hudi/schema/SchemaProvider.java | 8 -
.../main/java/org/apache/hudi/sink/CommitSink.java | 6 +-
.../JsonStringToHoodieRecordMapFunction.java | 4 +-
.../apache/hudi/streamer/FlinkStreamerConfig.java | 124 ++++++
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 105 ++++++
.../hudi/streamer/HoodieFlinkStreamerV2.java | 102 +++++
.../OperationConverter.java} | 30 +-
.../org/apache/hudi/util/AvroSchemaConverter.java | 147 ++++++++
.../apache/hudi/util/RowDataToAvroConverters.java | 309 +++++++++++++++
.../java/org/apache/hudi/util/StreamerUtil.java | 167 ++++++--
.../hudi/operator/StreamWriteFunctionTest.java | 303 +++++++++++++++
.../apache/hudi/operator/StreamWriteITCase.java | 129 +++++++
.../StreamWriteOperatorCoordinatorTest.java | 101 +++++
.../utils/MockFunctionInitializationContext.java | 48 +++
.../apache/hudi/operator/utils/MockMapState.java | 90 +++++
.../operator/utils/MockOperatorStateStore.java | 141 +++++++
.../utils/MockStreamingRuntimeContext.java | 124 ++++++
.../operator/utils/StreamWriteFunctionWrapper.java | 122 ++++++
.../hudi/operator/utils/TestConfigurations.java | 59 +++
.../org/apache/hudi/operator/utils/TestData.java | 164 ++++++++
.../TestJsonStringToHoodieRecordMapFunction.java | 2 +-
.../resources/test_read_schema.avsc} | 55 +--
hudi-flink/src/test/resources/test_source.data | 8 +
40 files changed, 3569 insertions(+), 346 deletions(-)
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index d845b90..e3e0eb4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -25,12 +25,14 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -46,6 +48,7 @@ import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -218,4 +221,35 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
return unCompletedTimeline.getInstants().filter(x -> x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
}
+
+ public String getInflightAndRequestedInstant(String tableType) {
+ final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+ HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+ HoodieTimeline unCompletedTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
+ return unCompletedTimeline.getInstants()
+ .filter(x -> x.getAction().equals(commitType))
+ .map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toList()).stream()
+ .max(Comparator.naturalOrder())
+ .orElse(null);
+ }
+
+ public String getLastCompletedInstant(String tableType) {
+ final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+ HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+ HoodieTimeline completedTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+ return completedTimeline.getInstants()
+ .filter(x -> x.getAction().equals(commitType))
+ .map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toList()).stream()
+ .max(Comparator.naturalOrder())
+ .orElse(null);
+ }
+
+ public void deletePendingInstant(String tableType, String instant) {
+ HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+ String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+ table.getMetaClient().getActiveTimeline()
+ .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
index 4354ea3..44eafd5 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
@@ -54,9 +54,7 @@ public class FlinkInMemoryStateIndex<T extends HoodieRecordPayload> extends Flin
if (context.getRuntimeContext() != null) {
MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
new MapStateDescriptor<>("indexState", TypeInformation.of(HoodieKey.class), TypeInformation.of(HoodieRecordLocation.class));
- if (context.getRuntimeContext() != null) {
- mapState = context.getRuntimeContext().getMapState(indexStateDesc);
- }
+ mapState = context.getRuntimeContext().getMapState(indexStateDesc);
}
}
diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
index a972bfd..f1c9fd5 100644
--- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
@@ -57,10 +57,10 @@
/* overlaps with 'commitsRollback' field. Adding this to track action type for all the instants being rolled back. */
{
"name": "instantsRollback",
- "default": null,
+ "default": [],
"type": {
"type": "array",
- "default": null,
+ "default": [],
"items": "HoodieInstantInfo"
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ec19fe3..3b989db 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -217,7 +217,7 @@ public class HoodieAvroUtils {
private static Schema initRecordKeySchema() {
Schema.Field recordKeyField =
- new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
+ new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
Schema recordKeySchema = Schema.createRecord("HoodieRecordKey", "", "", false);
recordKeySchema.setFields(Collections.singletonList(recordKeyField));
return recordKeySchema;
@@ -263,9 +263,9 @@ public class HoodieAvroUtils {
*/
public static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) {
List<Field> newFields = schema.getFields().stream()
- .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultValue())).collect(Collectors.toList());
+ .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList());
for (String newField : newFieldNames) {
- newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()));
+ newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE));
}
Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
newSchema.setFields(newFields);
@@ -329,7 +329,8 @@ public class HoodieAvroUtils {
private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field f) {
// cache the result of oldRecord.get() to save CPU expensive hash lookup
- Object fieldValue = oldRecord.get(f.name());
+ Schema oldSchema = oldRecord.getSchema();
+ Object fieldValue = oldSchema.getField(f.name()) == null ? null : oldRecord.get(f.name());
if (fieldValue == null) {
if (f.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(f.name(), null);
@@ -381,7 +382,7 @@ public class HoodieAvroUtils {
throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
+ "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
} else {
- projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
+ projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index e1e6124..dd1853d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -79,7 +79,14 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/
protected boolean isDeleteRecord(GenericRecord genericRecord) {
- Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+ final String isDeleteKey = "_hoodie_is_deleted";
+ // Modify to be compatible with new version Avro.
+ // The new version Avro throws for GenericRecord.get if the field name
+ // does not exist in the schema.
+ if (genericRecord.getSchema().getField(isDeleteKey) == null) {
+ return false;
+ }
+ Object deleteMarker = genericRecord.get(isDeleteKey);
return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 17e93fe..8017bc3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -98,8 +98,8 @@ public class HoodieTestDataGenerator {
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},";
public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"},"
+ "{\"name\": \"currency\", \"type\": \"string\"},";
- public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": "
- + "\"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": ["
+ public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": "
+ + "\"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": ["
+ "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},";
public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},";
public static final String EXTRA_TYPE_SCHEMA = "{\"name\": \"distance_in_meters\", \"type\": \"int\"},"
diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml
index 3d100c7..2a0f395 100644
--- a/hudi-flink/pom.xml
+++ b/hudi-flink/pom.xml
@@ -123,28 +123,77 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
+ <!-- Override the version to be same with Flink avro -->
+ <version>1.10.0</version>
<scope>compile</scope>
</dependency>
@@ -160,6 +209,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -173,7 +228,9 @@
<version>0.9.7</version>
</dependency>
- <!-- Junit Test Suite -->
+ <!-- test dependencies -->
+
+ <!-- Junit 5 dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
@@ -194,28 +251,7 @@
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-junit-jupiter</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.platform</groupId>
- <artifactId>junit-platform-runner</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.platform</groupId>
- <artifactId>junit-platform-suite-api</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.platform</groupId>
- <artifactId>junit-platform-commons</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- Hoodie - Test -->
+ <!-- Hoodie dependencies -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
@@ -240,14 +276,7 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-hadoop-mr</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Flink - Tests -->
+ <!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
@@ -259,15 +288,21 @@
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
- <classifier>tests</classifier>
+ <type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
- <classifier>tests</classifier>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
</dependency>
-
</dependencies>
</project>
diff --git a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java
deleted file mode 100644
index b65b43c..0000000
--- a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.operator.InstantGenerateOperator;
-import org.apache.hudi.operator.KeyedWriteProcessFunction;
-import org.apache.hudi.operator.KeyedWriteProcessOperator;
-import org.apache.hudi.sink.CommitSink;
-import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
-import org.apache.hudi.util.StreamerUtil;
-
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * An Utility which can incrementally consume data from Kafka and apply it to the target table.
- * currently, it only support COW table and insert, upsert operation.
- */
-public class HoodieFlinkStreamer {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- final Config cfg = new Config();
- JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help || args.length == 0) {
- cmd.usage();
- System.exit(1);
- }
- env.enableCheckpointing(cfg.checkpointInterval);
- env.getConfig().setGlobalJobParameters(cfg);
- // We use checkpoint to trigger write operation, including instant generating and committing,
- // There can only be one checkpoint at one time.
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- env.disableOperatorChaining();
-
- if (cfg.flinkCheckPointPath != null) {
- env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
- }
-
- TypedProperties props = StreamerUtil.getProps(cfg);
-
- // add kafka config
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cfg.kafkaBootstrapServers);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, cfg.kafkaGroupId);
-
- // add data source config
- props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
- props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField);
-
- // Read from kafka source
- DataStream<HoodieRecord> inputRecords =
- env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props))
- .filter(Objects::nonNull)
- .map(new JsonStringToHoodieRecordMapFunction(props))
- .name("kafka_to_hudi_record")
- .uid("kafka_to_hudi_record_uid");
-
- // InstantGenerateOperator helps to emit globally unique instantTime
- inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
- .name("instant_generator")
- .uid("instant_generator_id")
-
- // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
- .keyBy(HoodieRecord::getPartitionPath)
-
- // write operator, where the write operation really happens
- .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
- }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
- .name("write_process")
- .uid("write_process_uid")
- .setParallelism(env.getParallelism())
-
- // Commit can only be executed once, so make it one parallelism
- .addSink(new CommitSink())
- .name("commit_sink")
- .uid("commit_sink_uid")
- .setParallelism(1);
-
- env.execute(cfg.targetTableName);
- }
-
- public static class Config extends Configuration {
- @Parameter(names = {"--kafka-topic"}, description = "kafka topic", required = true)
- public String kafkaTopic;
-
- @Parameter(names = {"--kafka-group-id"}, description = "kafka consumer group id", required = true)
- public String kafkaGroupId;
-
- @Parameter(names = {"--kafka-bootstrap-servers"}, description = "kafka bootstrap.servers", required = true)
- public String kafkaBootstrapServers;
-
- @Parameter(names = {"--flink-checkpoint-path"}, description = "flink checkpoint path")
- public String flinkCheckPointPath;
-
- @Parameter(names = {"--flink-block-retry-times"}, description = "Times to retry when latest instant has not completed")
- public String blockRetryTime = "10";
-
- @Parameter(names = {"--flink-block-retry-interval"}, description = "Seconds between two tries when latest instant has not completed")
- public String blockRetryInterval = "1";
-
- @Parameter(names = {"--target-base-path"},
- description = "base path for the target hoodie table. "
- + "(Will be created if did not exist first time around. If exists, expected to be a hoodie table)",
- required = true)
- public String targetBasePath;
-
- @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
- public String targetTableName;
-
- @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
- public String tableType;
-
- @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
- + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
- + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
- + "to individual classes, for supported properties.")
- public String propsFilePath =
- "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
-
- @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
- + "(using the CLI parameter \"--props\") can also be passed command line using this parameter.")
- public List<String> configs = new ArrayList<>();
-
- @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
- + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
- public String sourceOrderingField = "ts";
-
- @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off "
- + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
- public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
-
- @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
- + "is purely new data/inserts to gain speed)", converter = OperationConverter.class)
- public WriteOperationType operation = WriteOperationType.UPSERT;
-
- @Parameter(names = {"--filter-dupes"},
- description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
- public Boolean filterDupes = false;
-
- @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
- public Boolean commitOnErrors = false;
-
- /**
- * Flink checkpoint interval.
- */
- @Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.")
- public Long checkpointInterval = 1000 * 5L;
-
- @Parameter(names = {"--help", "-h"}, help = true)
- public Boolean help = false;
- }
-
- private static class OperationConverter implements IStringConverter<WriteOperationType> {
-
- @Override
- public WriteOperationType convert(String value) throws ParameterException {
- return WriteOperationType.valueOf(value);
- }
- }
-}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
new file mode 100644
index 0000000..655fd1a
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Flink config options.
+ *
+ * <p>It has the options for Hoodie table read and write. It also defines some utilities.
+ */
+public class FlinkOptions {
+ private FlinkOptions() {
+ }
+
+ // ------------------------------------------------------------------------
+ // Base Options
+ // ------------------------------------------------------------------------
+ public static final ConfigOption<String> PATH = ConfigOptions
+ .key("path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Base path for the target hoodie table."
+ + "\nThe path would be created if it does not exist,\n"
+ + "otherwise a Hoodie table expects to be initialized successfully");
+
+ // ------------------------------------------------------------------------
+ // Read Options
+ // ------------------------------------------------------------------------
+ public static final ConfigOption<String> READ_SCHEMA_FILE_PATH = ConfigOptions
+ .key("read.schema.file.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Avro schema file path, the parsed schema is used for deserializing");
+
+ // ------------------------------------------------------------------------
+ // Write Options
+ // ------------------------------------------------------------------------
+ public static final ConfigOption<String> TABLE_NAME = ConfigOptions
+ .key(HoodieWriteConfig.TABLE_NAME)
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name to register to Hive metastore");
+
+ public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
+ .key("write.table.type")
+ .stringType()
+ .defaultValue(HoodieTableType.COPY_ON_WRITE.name())
+ .withDescription("Type of table to write, COPY_ON_WRITE (or) MERGE_ON_READ");
+
+ public static final ConfigOption<String> OPERATION = ConfigOptions
+ .key("write.operation")
+ .stringType()
+ .defaultValue("upsert")
+ .withDescription("The write operation, that this write should do");
+
+ public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
+ .key("write.precombine.field")
+ .stringType()
+ .defaultValue("ts")
+ .withDescription("Field used in preCombining before actual write. When two records have the same\n"
+ + "key value, we will pick the one with the largest value for the precombine field,\n"
+ + "determined by Object.compareTo(..)");
+
+ public static final ConfigOption<String> PAYLOAD_CLASS = ConfigOptions
+ .key("write.payload.class")
+ .stringType()
+ .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+ .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ + "This will render any value set for the option in-effective");
+
+ /**
+ * Flag to indicate whether to drop duplicates upon insert.
+ * By default insert will accept duplicates, to gain extra performance.
+ */
+ public static final ConfigOption<Boolean> INSERT_DROP_DUPS = ConfigOptions
+ .key("write.insert.drop.duplicates")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Flag to indicate whether to drop duplicates upon insert.\n"
+ + "By default insert will accept duplicates, to gain extra performance");
+
+ public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions
+ .key("write.retry.times")
+ .intType()
+ .defaultValue(3)
+ .withDescription("Flag to indicate how many times streaming job should retry for a failed checkpoint batch.\n"
+ + "By default 3");
+
+ public static final ConfigOption<Long> RETRY_INTERVAL_MS = ConfigOptions
+ .key("write.retry.interval.ms")
+ .longType()
+ .defaultValue(2000L)
+ .withDescription("Flag to indicate how long (by millisecond) before a retry should issued for failed checkpoint batch.\n"
+ + "By default 2000 and it will be doubled by every retry");
+
+ public static final ConfigOption<Boolean> IGNORE_FAILED = ConfigOptions
+ .key("write.ignore.failed")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n"
+ + "By default true (in favor of streaming progressing over data integrity)");
+
+ public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
+ .key(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
+ .stringType()
+ .defaultValue("uuid")
+ .withDescription("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+ + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ + "the dot notation eg: `a.b.c`");
+
+ public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
+ .key(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
+ .stringType()
+ .defaultValue("partition-path")
+ .withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n"
+ + "Actual value obtained by invoking .toString()");
+
+ public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
+ .key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP)
+ .stringType()
+ .defaultValue(SimpleAvroKeyGenerator.class.getName())
+ .withDescription("Key generator class, that implements will extract the key out of incoming record");
+
+ public static final ConfigOption<Integer> WRITE_TASK_PARALLELISM = ConfigOptions
+ .key("write.task.parallelism")
+ .intType()
+ .defaultValue(4)
+ .withDescription("Parallelism of tasks that do actual write, default is 4");
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ // Remember to update the set when adding new options.
+ public static final List<ConfigOption<?>> OPTIONAL_OPTIONS = Arrays.asList(
+ TABLE_TYPE, OPERATION, PRECOMBINE_FIELD, PAYLOAD_CLASS, INSERT_DROP_DUPS, RETRY_TIMES,
+ RETRY_INTERVAL_MS, IGNORE_FAILED, RECORD_KEY_FIELD, PARTITION_PATH_FIELD, KEYGEN_CLASS
+ );
+
+ // Prefix for Hoodie specific properties.
+ private static final String PROPERTIES_PREFIX = "properties.";
+
+ /**
+ * Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}.
+ * The latter is more suitable for the table APIs. It reads all the properties
+ * in the properties file (set by `--props` option) and cmd line options
+ * (set by `--hoodie-conf` option).
+ */
+ @SuppressWarnings("unchecked, rawtypes")
+ public static org.apache.flink.configuration.Configuration fromStreamerConfig(FlinkStreamerConfig config) {
+ Map<String, String> propsMap = new HashMap<String, String>((Map) StreamerUtil.getProps(config));
+ org.apache.flink.configuration.Configuration conf = fromMap(propsMap);
+
+ conf.setString(FlinkOptions.PATH, config.targetBasePath);
+ conf.setString(READ_SCHEMA_FILE_PATH, config.readSchemaFilePath);
+ conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
+ // copy_on_write works same as COPY_ON_WRITE
+ conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
+ conf.setString(FlinkOptions.OPERATION, config.operation.value());
+ conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
+ conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
+ conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
+ conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
+ conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
+ conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
+ conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
+ conf.setInteger(FlinkOptions.WRITE_TASK_PARALLELISM, config.writeTaskNum);
+
+ return conf;
+ }
+
+ /**
+ * Collects the config options that start with 'properties.' into a 'key'='value' list.
+ */
+ public static Map<String, String> getHoodieProperties(Map<String, String> options) {
+ final Map<String, String> hoodieProperties = new HashMap<>();
+
+ if (hasPropertyOptions(options)) {
+ options.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(key -> {
+ final String value = options.get(key);
+ final String subKey = key.substring((PROPERTIES_PREFIX).length());
+ hoodieProperties.put(subKey, value);
+ });
+ }
+ return hoodieProperties;
+ }
+
+ /**
+ * Collects all the config options, the 'properties.' prefix would be removed if the option key starts with it.
+ */
+ public static Configuration flatOptions(Configuration conf) {
+ final Map<String, String> propsMap = new HashMap<>();
+
+ conf.toMap().forEach((key, value) -> {
+ final String subKey = key.startsWith(PROPERTIES_PREFIX)
+ ? key.substring((PROPERTIES_PREFIX).length())
+ : key;
+ propsMap.put(subKey, value);
+ });
+ return fromMap(propsMap);
+ }
+
+ private static boolean hasPropertyOptions(Map<String, String> options) {
+ return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
+ }
+
+ /** Creates a new configuration that is initialized with the options of the given map. */
+ private static Configuration fromMap(Map<String, String> map) {
+ final Configuration configuration = new Configuration();
+ map.forEach(configuration::setString);
+ return configuration;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
index 103aef5..5c9930d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
@@ -18,7 +18,7 @@
package org.apache.hudi.operator;
-import org.apache.hudi.HoodieFlinkStreamer;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
@@ -66,7 +66,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
public static final String NAME = "InstantGenerateOperator";
- private HoodieFlinkStreamer.Config cfg;
+ private FlinkStreamerConfig cfg;
private HoodieFlinkWriteClient writeClient;
private SerializableConfiguration serializableHadoopConf;
private transient FileSystem fs;
@@ -94,13 +94,13 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
public void open() throws Exception {
super.open();
// get configs from runtimeContext
- cfg = (HoodieFlinkStreamer.Config) runtimeContext.getExecutionConfig().getGlobalJobParameters();
+ cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
// retry times
- retryTimes = Integer.valueOf(cfg.blockRetryTime);
+ retryTimes = Integer.valueOf(cfg.instantRetryTimes);
// retry interval
- retryInterval = Integer.valueOf(cfg.blockRetryInterval);
+ retryInterval = Integer.valueOf(cfg.instantRetryInterval);
// hadoopConf
serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
index d3ebddf..a59a995 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
@@ -18,7 +18,7 @@
package org.apache.hudi.operator;
-import org.apache.hudi.HoodieFlinkStreamer;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -77,7 +77,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
/**
* Job conf.
*/
- private HoodieFlinkStreamer.Config cfg;
+ private FlinkStreamerConfig cfg;
/**
* Write Client.
@@ -90,7 +90,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext()));
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
new file mode 100644
index 0000000..34a61d4
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p><h2>Work Flow</h2>
+ *
+ * <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
+ * It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully,
+ * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
+ *
+ * <p><h2>Exactly-once Semantics</h2>
+ *
+ * <p>The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
+ * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
+ * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
+ * The function process thread then block data buffering and the checkpoint thread starts flushing the existing data buffer.
+ * When the existing data buffer write successfully, the process thread unblock and start buffering again for the next round checkpoint.
+ * Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics.
+ *
+ * <p><h2>Fault Tolerance</h2>
+ *
+ * <p>The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back
+ * the written data and throws when any error occurs. This means any checkpoint or task failure would trigger a failover.
+ * The operator coordinator would try several times when committing the writestatus.
+ *
+ * <p>Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks
+ * write to the same file group that conflict. The general case for partition path is a datetime field,
+ * so the sink task is very possible to have IO bottleneck, the more flexible solution is to shuffle the
+ * data by the file group IDs.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
+
+ /**
+ * Write buffer for a checkpoint.
+ */
+ private transient List<HoodieRecord> buffer;
+
+ /**
+ * The buffer lock to control data buffering/flushing.
+ */
+ private transient ReentrantLock bufferLock;
+
+ /**
+ * The condition to decide whether to add new records into the buffer.
+ */
+ private transient Condition addToBufferCondition;
+
+ /**
+ * Flag saying whether there is an on-going checkpoint.
+ */
+ private volatile boolean onCheckpointing = false;
+
+ /**
+ * Config options.
+ */
+ private final Configuration config;
+
+ /**
+ * Id of current subtask.
+ */
+ private int taskID;
+
+ /**
+ * Write Client.
+ */
+ private transient HoodieFlinkWriteClient writeClient;
+
+ private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
+
+ /**
+ * HoodieKey generator.
+ */
+ private transient KeyGenerator keyGenerator;
+
+ /**
+ * Row type of the input.
+ */
+ private final RowType rowType;
+
+ /**
+ * Avro schema of the input.
+ */
+ private final Schema avroSchema;
+
+ private transient RowDataToAvroConverters.RowDataToAvroConverter converter;
+
+ /**
+ * The REQUESTED instant we write the data.
+ */
+ private volatile String currentInstant;
+
+ /**
+ * Gateway to send operator events to the operator coordinator.
+ */
+ private transient OperatorEventGateway eventGateway;
+
+ /**
+ * Constructs a StreamingSinkFunction.
+ *
+ * @param rowType The input row type
+ * @param config The config options
+ */
+ public StreamWriteFunction(RowType rowType, Configuration config) {
+ this.rowType = rowType;
+ this.avroSchema = StreamerUtil.getSourceSchema(config);
+ this.config = config;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws IOException {
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
+ this.converter = RowDataToAvroConverters.createConverter(this.rowType);
+ initBuffer();
+ initWriteClient();
+ initWriteFunction();
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) {
+ // no operation
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+ bufferLock.lock();
+ try {
+ // Based on the fact that the coordinator starts the checkpoint first,
+ // it would check the validity.
+ this.onCheckpointing = true;
+ this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
+ Preconditions.checkNotNull(this.currentInstant,
+ "No inflight instant when flushing data");
+ // wait for the buffer data flush out and request a new instant
+ flushBuffer();
+ // signal the task thread to start buffering
+ addToBufferCondition.signal();
+ } finally {
+ this.onCheckpointing = false;
+ bufferLock.unlock();
+ }
+ }
+
+ @Override
+ public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) throws Exception {
+ bufferLock.lock();
+ try {
+ if (onCheckpointing) {
+ addToBufferCondition.await();
+ }
+ this.buffer.add(toHoodieRecord(value));
+ } finally {
+ bufferLock.unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.writeClient != null) {
+ this.writeClient.close();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Getter/Setter
+ // -------------------------------------------------------------------------
+
+ @VisibleForTesting
+ @SuppressWarnings("rawtypes")
+ public List<HoodieRecord> getBuffer() {
+ return buffer;
+ }
+
+ @VisibleForTesting
+ @SuppressWarnings("rawtypes")
+ public HoodieFlinkWriteClient getWriteClient() {
+ return writeClient;
+ }
+
+ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+ this.eventGateway = operatorEventGateway;
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void initBuffer() {
+ this.buffer = new ArrayList<>();
+ this.bufferLock = new ReentrantLock();
+ this.addToBufferCondition = this.bufferLock.newCondition();
+ }
+
+ private void initWriteClient() {
+ HoodieFlinkEngineContext context =
+ new HoodieFlinkEngineContext(
+ new SerializableConfiguration(StreamerUtil.getHadoopConf()),
+ new FlinkTaskContextSupplier(getRuntimeContext()));
+
+ writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.config));
+ }
+
+ private void initWriteFunction() {
+ final String writeOperation = this.config.get(FlinkOptions.OPERATION);
+ switch (WriteOperationType.fromValue(writeOperation)) {
+ case INSERT:
+ this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime);
+ break;
+ case UPSERT:
+ this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
+ break;
+ default:
+ throw new RuntimeException("Unsupported write operation : " + writeOperation);
+ }
+ }
+
+ /**
+ * Converts the give record to a {@link HoodieRecord}.
+ *
+ * @param record The input record
+ * @return HoodieRecord based on the configuration
+ * @throws IOException if error occurs
+ */
+ @SuppressWarnings("rawtypes")
+ private HoodieRecord toHoodieRecord(I record) throws IOException {
+ boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
+ || WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
+ GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
+ final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS);
+ Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
+ this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
+ HoodieRecordPayload payload = shouldCombine
+ ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
+ : StreamerUtil.createPayload(payloadClazz, gr);
+ return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
+ }
+
+ private void flushBuffer() {
+ final List<WriteStatus> writeStatus;
+ if (buffer.size() > 0) {
+ writeStatus = writeFunction.apply(buffer, currentInstant);
+ buffer.clear();
+ } else {
+ LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
+ writeStatus = Collections.emptyList();
+ }
+ this.eventGateway.sendEventToCoordinator(new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus));
+ this.currentInstant = "";
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
new file mode 100644
index 0000000..3f4d940
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Operator for {@link StreamSink}.
+ *
+ * @param <I> The input type
+ */
+public class StreamWriteOperator<I>
+ extends KeyedProcessOperator<Object, I, Object>
+ implements OperatorEventHandler {
+ private final StreamWriteFunction<Object, I, Object> sinkFunction;
+
+ public StreamWriteOperator(RowType rowType, Configuration conf) {
+ super(new StreamWriteFunction<>(rowType, conf));
+ this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent operatorEvent) {
+ // do nothing
+ }
+
+ void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+ sinkFunction.setOperatorEventGateway(operatorEventGateway);
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
new file mode 100644
index 0000000..524c601
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * {@link OperatorCoordinator} for {@link StreamWriteFunction}.
+ *
+ * <p>This coordinator starts a new instant when a new checkpoint starts. It commits the instant when all the
+ * operator tasks write the buffer successfully for a round of checkpoint.
+ *
+ * <p>If there is no data for a round of checkpointing, it rolls back the metadata.
+ *
+ * @see StreamWriteFunction for the work flow and semantics
+ */
+public class StreamWriteOperatorCoordinator
+ implements OperatorCoordinator {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamWriteOperatorCoordinator.class);
+
+ /**
+ * Config options.
+ */
+ private final Configuration conf;
+
+ /**
+ * Write client.
+ */
+ private transient HoodieFlinkWriteClient writeClient;
+
+ private long inFlightCheckpoint = -1;
+
+ /**
+ * Current REQUESTED instant, for validation.
+ */
+ private String inFlightInstant = "";
+
+ /**
+ * Event buffer for one round of checkpointing. When all the elements are non-null and have the same
+ * write instant, then the instant succeed and we can commit it.
+ */
+ private transient BatchWriteSuccessEvent[] eventBuffer;
+
+ /**
+ * Task number of the operator.
+ */
+ private final int parallelism;
+
+ /**
+ * Constructs a StreamingSinkOperatorCoordinator.
+ *
+ * @param conf The config options
+ * @param parallelism The operator task number
+ */
+ public StreamWriteOperatorCoordinator(
+ Configuration conf,
+ int parallelism) {
+ this.conf = conf;
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public void start() throws Exception {
+ // initialize event buffer
+ reset();
+ // writeClient
+ initWriteClient();
+ // init table, create it if not exists.
+ initTable();
+ }
+
+ @Override
+ public void close() {
+ if (writeClient != null) {
+ writeClient.close();
+ }
+ this.eventBuffer = null;
+ }
+
+ @Override
+ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
+ try {
+ final String errMsg = "A new checkpoint starts while the last checkpoint buffer"
+ + " data has not finish writing, roll back the last write and throw";
+ checkAndForceCommit(errMsg);
+ this.inFlightInstant = this.writeClient.startCommit();
+ this.inFlightCheckpoint = checkpointId;
+ LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId);
+ result.complete(writeCheckpointBytes());
+ } catch (Throwable throwable) {
+ // when a checkpoint fails, throws directly.
+ result.completeExceptionally(
+ new CompletionException(
+ String.format("Failed to checkpoint Instant %s for source %s",
+ this.inFlightInstant, this.getClass().getSimpleName()), throwable));
+ }
+ }
+
+ @Override
+ public void checkpointComplete(long checkpointId) {
+ // start to commit the instant.
+ checkAndCommitWithRetry();
+ }
+
+ public void notifyCheckpointAborted(long checkpointId) {
+ Preconditions.checkState(inFlightCheckpoint == checkpointId,
+ "The aborted checkpoint should always be the last checkpoint");
+ checkAndForceCommit("The last checkpoint was aborted, roll back the last write and throw");
+ }
+
+ @Override
+ public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception {
+ if (checkpointData != null) {
+ // restore when any checkpoint completed
+ deserializeCheckpointAndRestore(checkpointData);
+ }
+ }
+
+ @Override
+ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
+ // no event to handle
+ Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
+ "The coordinator can only handle BatchWriteSuccessEvent");
+ BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
+ Preconditions.checkState(event.getInstantTime().equals(this.inFlightInstant),
+ String.format("Receive an unexpected event for instant %s from task %d",
+ event.getInstantTime(), event.getTaskID()));
+ this.eventBuffer[event.getTaskID()] = event;
+ }
+
+ @Override
+ public void subtaskFailed(int i, @Nullable Throwable throwable) {
+ // no operation
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ @SuppressWarnings("rawtypes")
+ private void initWriteClient() {
+ writeClient = new HoodieFlinkWriteClient(
+ new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)),
+ StreamerUtil.getHoodieClientConfig(this.conf),
+ true);
+ }
+
+ private void initTable() throws IOException {
+ final String basePath = this.conf.getString(FlinkOptions.PATH);
+ final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
+ // Hadoop FileSystem
+ try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+ if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
+ HoodieTableMetaClient.initTableType(
+ hadoopConf,
+ basePath,
+ HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)),
+ this.conf.getString(FlinkOptions.TABLE_NAME),
+ "archived",
+ this.conf.getString(FlinkOptions.PAYLOAD_CLASS),
+ 1);
+ LOG.info("Table initialized");
+ } else {
+ LOG.info("Table [{}/{}] already exists, no need to initialize the table",
+ basePath, this.conf.getString(FlinkOptions.TABLE_NAME));
+ }
+ }
+ }
+
+ static byte[] readBytes(DataInputStream in, int size) throws IOException {
+ byte[] bytes = new byte[size];
+ in.readFully(bytes);
+ return bytes;
+ }
+
+ /**
+ * Serialize the coordinator state. The current implementation may not be super efficient,
+ * but it should not matter that much because most of the state should be rather small.
+ * Large states themselves may already be a problem regardless of how the serialization
+ * is implemented.
+ *
+ * @return A byte array containing the serialized state of the source coordinator.
+ * @throws IOException When something goes wrong in serialization.
+ */
+ private byte[] writeCheckpointBytes() throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
+
+ out.writeLong(this.inFlightCheckpoint);
+ byte[] serializedInstant = this.inFlightInstant.getBytes();
+ out.writeInt(serializedInstant.length);
+ out.write(serializedInstant);
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ /**
+ * Restore the state of this source coordinator from the state bytes.
+ *
+ * @param bytes The checkpoint bytes that was returned from {@link #writeCheckpointBytes()}
+ * @throws Exception When the deserialization failed.
+ */
+ private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream in = new DataInputViewStreamWrapper(bais)) {
+ long checkpointID = in.readLong();
+ int serializedInstantSize = in.readInt();
+ byte[] serializedInstant = readBytes(in, serializedInstantSize);
+ this.inFlightCheckpoint = checkpointID;
+ this.inFlightInstant = new String(serializedInstant);
+ }
+ }
+
+ private void reset() {
+ this.inFlightInstant = "";
+ this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
+ }
+
+ private void checkAndForceCommit(String errMsg) {
+ if (!checkReady()) {
+ // forced but still has inflight instant
+ String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE));
+ if (inflightInstant != null) {
+ assert inflightInstant.equals(this.inFlightInstant);
+ writeClient.rollback(this.inFlightInstant);
+ throw new HoodieException(errMsg);
+ }
+ if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
+ // The last checkpoint finished successfully.
+ return;
+ }
+ }
+ doCommit();
+ }
+
+ private void checkAndCommitWithRetry() {
+ int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES);
+ if (retryTimes < 0) {
+ retryTimes = 1;
+ }
+ long retryIntervalMillis = this.conf.getLong(FlinkOptions.RETRY_INTERVAL_MS);
+ int tryTimes = 0;
+ while (tryTimes++ < retryTimes) {
+ try {
+ if (!checkReady()) {
+ // Do not throw if the try times expires but the event buffer are still not ready,
+ // because we have a force check when next checkpoint starts.
+ sleepFor(retryIntervalMillis);
+ continue;
+ }
+ doCommit();
+ return;
+ } catch (Throwable throwable) {
+ String cause = throwable.getCause() == null ? "" : throwable.getCause().toString();
+ LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.inFlightInstant, tryTimes, cause);
+ if (tryTimes == retryTimes) {
+ throw new HoodieException(throwable);
+ }
+ sleepFor(retryIntervalMillis);
+ }
+ }
+ }
+
+ /**
+ * Sleep {@code intervalMillis} milliseconds in current thread.
+ */
+ private void sleepFor(long intervalMillis) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(intervalMillis);
+ } catch (InterruptedException e) {
+ LOG.error("Thread interrupted while waiting to retry the instant commits");
+ throw new HoodieException(e);
+ }
+ }
+
+ /** Checks the buffer is ready to commit. */
+ private boolean checkReady() {
+ return Arrays.stream(eventBuffer).allMatch(event ->
+ event != null && event.getInstantTime().equals(this.inFlightInstant));
+ }
+
+ /** Performs the actual commit action. */
+ private void doCommit() {
+ List<WriteStatus> writeResults = Arrays.stream(eventBuffer)
+ .map(BatchWriteSuccessEvent::getWriteStatuses)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ if (writeResults.size() == 0) {
+ // No data has written, clear the metadata file
+ this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
+ reset();
+ return;
+ }
+
+ // commit or rollback
+ long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+ long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
+ boolean hasErrors = totalErrorRecords > 0;
+
+ if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+ HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+ if (hasErrors) {
+ LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total="
+ + totalErrorRecords + "/" + totalRecords);
+ }
+
+ boolean success = writeClient.commit(this.inFlightInstant, writeResults, Option.of(checkpointCommitMetadata));
+ if (success) {
+ reset();
+ LOG.info("Commit instant [{}] success!", this.inFlightInstant);
+ } else {
+ throw new HoodieException(String.format("Commit instant [%s] failed!", this.inFlightInstant));
+ }
+ } else {
+ LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
+ LOG.error("The first 100 error messages");
+ writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
+ LOG.error("Global error for partition path {} and fileID {}: {}",
+ ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());
+ if (ws.getErrors().size() > 0) {
+ ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));
+ }
+ });
+ // Rolls back instant
+ writeClient.rollback(this.inFlightInstant);
+ throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.inFlightInstant));
+ }
+ }
+
+ @VisibleForTesting
+ public BatchWriteSuccessEvent[] getEventBuffer() {
+ return eventBuffer;
+ }
+
+ @VisibleForTesting
+ public String getInFlightInstant() {
+ return inFlightInstant;
+ }
+
+ /**
+ * Provider for {@link StreamWriteOperatorCoordinator}.
+ */
+ public static class Provider implements OperatorCoordinator.Provider {
+ private final OperatorID operatorId;
+ private final Configuration conf;
+ private final int numTasks;
+
+ public Provider(OperatorID operatorId, Configuration conf, int numTasks) {
+ this.operatorId = operatorId;
+ this.conf = conf;
+ this.numTasks = numTasks;
+ }
+
+ public OperatorID getOperatorId() {
+ return this.operatorId;
+ }
+
+ public OperatorCoordinator create(Context context) {
+ return new StreamWriteOperatorCoordinator(this.conf, this.numTasks);
+ }
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
new file mode 100644
index 0000000..f5faa54
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Factory class for {@link StreamWriteOperator}.
+ */
+public class StreamWriteOperatorFactory<I>
+ extends SimpleUdfStreamOperatorFactory<Object>
+ implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
+ private static final long serialVersionUID = 1L;
+
+ private final StreamWriteOperator<I> operator;
+ private final Configuration conf;
+ private final int numTasks;
+
+ public StreamWriteOperatorFactory(
+ RowType rowType,
+ Configuration conf,
+ int numTasks) {
+ super(new StreamWriteOperator<>(rowType, conf));
+ this.operator = (StreamWriteOperator<I>) getOperator();
+ this.conf = conf;
+ this.numTasks = numTasks;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
+ final OperatorID operatorID = parameters.getStreamConfig().getOperatorID();
+ final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
+
+ this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID));
+ this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+ this.operator.setProcessingTimeService(this.processingTimeService);
+ eventDispatcher.registerEventHandler(operatorID, operator);
+ return (T) operator;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
+ return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, this.numTasks);
+ }
+
+ @Override
+ public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
+ super.setProcessingTimeService(processingTimeService);
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java
new file mode 100644
index 0000000..db5432e
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.event;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import org.apache.hudi.client.WriteStatus;
+
+import java.util.List;
+
+/**
+ * An operator event to mark successful checkpoint batch write.
+ */
+public class BatchWriteSuccessEvent implements OperatorEvent {
+ private static final long serialVersionUID = 1L;
+
+ private final List<WriteStatus> writeStatuses;
+ private final int taskID;
+ private final String instantTime;
+
+ public BatchWriteSuccessEvent(
+ int taskID,
+ String instantTime,
+ List<WriteStatus> writeStatuses) {
+ this.taskID = taskID;
+ this.instantTime = instantTime;
+ this.writeStatuses = writeStatuses;
+ }
+
+ public List<WriteStatus> getWriteStatuses() {
+ return writeStatuses;
+ }
+
+ public int getTaskID() {
+ return taskID;
+ }
+
+ public String getInstantTime() {
+ return instantTime;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
index 82699d9..aed6e17 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
@@ -21,9 +21,11 @@ package org.apache.hudi.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -43,16 +45,13 @@ public class FilebasedSchemaProvider extends SchemaProvider {
private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.file";
}
- private final FileSystem fs;
-
private final Schema sourceSchema;
private Schema targetSchema;
public FilebasedSchemaProvider(TypedProperties props) {
- super(props);
StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
- this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
+ FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
@@ -64,6 +63,16 @@ public class FilebasedSchemaProvider extends SchemaProvider {
}
}
+ public FilebasedSchemaProvider(Configuration conf) {
+ final String readSchemaPath = conf.getString(FlinkOptions.READ_SCHEMA_FILE_PATH);
+ final FileSystem fs = FSUtils.getFs(readSchemaPath, StreamerUtil.getHadoopConf());
+ try {
+ this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(readSchemaPath)));
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error reading schema", ioe);
+ }
+ }
+
@Override
public Schema getSourceSchema() {
return sourceSchema;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
index 74b4067..5def413 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
@@ -18,8 +18,6 @@
package org.apache.hudi.schema;
-import org.apache.hudi.common.config.TypedProperties;
-
import org.apache.avro.Schema;
import java.io.Serializable;
@@ -29,12 +27,6 @@ import java.io.Serializable;
*/
public abstract class SchemaProvider implements Serializable {
- protected TypedProperties config;
-
- protected SchemaProvider(TypedProperties props) {
- this.config = props;
- }
-
public abstract Schema getSourceSchema();
public Schema getTargetSchema() {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
index 4ca7930..3a12842 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
@@ -25,7 +25,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
-import org.apache.hudi.HoodieFlinkStreamer;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -51,7 +51,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
/**
* Job conf.
*/
- private HoodieFlinkStreamer.Config cfg;
+ private FlinkStreamerConfig cfg;
/**
* Write client.
@@ -72,7 +72,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Get configs from runtimeContext
- cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java
index 4eda371..8d0189c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java
@@ -22,10 +22,10 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
new file mode 100644
index 0000000..7df63fa
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.streamer;
+
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Configurations for Hoodie Flink streamer.
+ */
+public class FlinkStreamerConfig extends Configuration {
+ @Parameter(names = {"--kafka-topic"}, description = "Kafka topic name.", required = true)
+ public String kafkaTopic;
+
+ @Parameter(names = {"--kafka-group-id"}, description = "Kafka consumer group id.", required = true)
+ public String kafkaGroupId;
+
+ @Parameter(names = {"--kafka-bootstrap-servers"}, description = "Kafka bootstrap.servers.", required = true)
+ public String kafkaBootstrapServers;
+
+ @Parameter(names = {"--flink-checkpoint-path"}, description = "Flink checkpoint path.")
+ public String flinkCheckPointPath;
+
+ @Parameter(names = {"--instant-retry-times"}, description = "Times to retry when latest instant has not completed.")
+ public String instantRetryTimes = "10";
+
+ @Parameter(names = {"--instant-retry-interval"}, description = "Seconds between two tries when latest instant has not completed.")
+ public String instantRetryInterval = "1";
+
+ @Parameter(names = {"--target-base-path"},
+ description = "Base path for the target hoodie table. "
+ + "(Will be created if did not exist first time around. If exists, expected to be a hoodie table).",
+ required = true)
+ public String targetBasePath;
+
+ @Parameter(names = {"--read-schema-path"},
+ description = "Avro schema file path, the parsed schema is used for deserializing.",
+ required = true)
+ public String readSchemaFilePath;
+
+ @Parameter(names = {"--target-table"}, description = "Name of the target table in Hive.", required = true)
+ public String targetTableName;
+
+ @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
+ public String tableType;
+
+ @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
+ + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+ + "to individual classes, for supported properties.")
+ public String propsFilePath =
+ "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
+
+ @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ + "(using the CLI parameter \"--props\") can also be passed command line using this parameter.")
+ public List<String> configs = new ArrayList<>();
+
+ @Parameter(names = {"--record-key-field"}, description = "Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+ + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ + "the dot notation eg: `a.b.c`. By default `uuid`.")
+ public String recordKeyField = "uuid";
+
+ @Parameter(names = {"--partition-path-field"}, description = "Partition path field. Value to be used at \n"
+ + "the `partitionPath` component of `HoodieKey`. Actual value obtained by invoking .toString(). By default `partitionpath`.")
+ public String partitionPathField = "partitionpath";
+
+ @Parameter(names = {"--keygen-class"}, description = "Key generator class, that implements will extract the key out of incoming record.\n"
+ + "By default `SimpleAvroKeyGenerator`.")
+ public String keygenClass = SimpleAvroKeyGenerator.class.getName();
+
+ @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
+ + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record.")
+ public String sourceOrderingField = "ts";
+
+ @Parameter(names = {"--payload-class"}, description = "Subclass of HoodieRecordPayload, that works off "
+ + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value.")
+ public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
+
+ @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ + "is purely new data/inserts to gain speed).", converter = OperationConverter.class)
+ public WriteOperationType operation = WriteOperationType.UPSERT;
+
+ @Parameter(names = {"--filter-dupes"},
+ description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert.")
+ public Boolean filterDupes = false;
+
+ @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.")
+ public Boolean commitOnErrors = false;
+
+ /**
+ * Flink checkpoint interval.
+ */
+ @Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.")
+ public Long checkpointInterval = 1000 * 5L;
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ @Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.")
+ public Integer writeTaskNum = 4;
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
new file mode 100644
index 0000000..f6d75d3
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.streamer;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.InstantGenerateOperator;
+import org.apache.hudi.operator.KeyedWriteProcessFunction;
+import org.apache.hudi.operator.KeyedWriteProcessOperator;
+import org.apache.hudi.sink.CommitSink;
+import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An Utility which can incrementally consume data from Kafka and apply it to the target table.
+ * currently, it only support COW table and insert, upsert operation.
+ */
+public class HoodieFlinkStreamer {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final FlinkStreamerConfig cfg = new FlinkStreamerConfig();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+ env.enableCheckpointing(cfg.checkpointInterval);
+ env.getConfig().setGlobalJobParameters(cfg);
+ // We use checkpoint to trigger write operation, including instant generating and committing,
+ // There can only be one checkpoint at one time.
+ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+ if (cfg.flinkCheckPointPath != null) {
+ env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
+ }
+
+ TypedProperties props = StreamerUtil.appendKafkaProps(cfg);
+
+ // add data source config
+ props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
+ props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField);
+
+ // Read from kafka source
+ DataStream<HoodieRecord> inputRecords =
+ env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props))
+ .filter(Objects::nonNull)
+ .map(new JsonStringToHoodieRecordMapFunction(props))
+ .name("kafka_to_hudi_record")
+ .uid("kafka_to_hudi_record_uid");
+
+ inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
+ .name("instant_generator")
+ .uid("instant_generator_id")
+
+ // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
+ .keyBy(HoodieRecord::getPartitionPath)
+
+ // write operator, where the write operation really happens
+ .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
+ }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
+ .name("write_process")
+ .uid("write_process_uid")
+ .setParallelism(env.getParallelism())
+
+ // Commit can only be executed once, so make it one parallelism
+ .addSink(new CommitSink())
+ .name("commit_sink")
+ .uid("commit_sink_uid")
+ .setParallelism(1);
+
+ env.execute(cfg.targetTableName);
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
new file mode 100644
index 0000000..a8f9245
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.streamer;
+
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamWriteOperatorFactory;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Properties;
+
+/**
+ * An Utility which can incrementally consume data from Kafka and apply it to the target table.
+ * currently, it only support COW table and insert, upsert operation.
+ */
+public class HoodieFlinkStreamerV2 {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final FlinkStreamerConfig cfg = new FlinkStreamerConfig();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+ env.enableCheckpointing(cfg.checkpointInterval);
+ env.getConfig().setGlobalJobParameters(cfg);
+ // We use checkpoint to trigger write operation, including instant generating and committing,
+ // There can only be one checkpoint at one time.
+ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+ if (cfg.flinkCheckPointPath != null) {
+ env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
+ }
+
+ Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg);
+
+ // Read from kafka source
+ RowType rowType =
+ (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
+ .getLogicalType();
+ Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
+ int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
+ StreamWriteOperatorFactory<RowData> operatorFactory =
+ new StreamWriteOperatorFactory<>(rowType, conf, numWriteTask);
+
+ int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
+ LogicalType partitionFieldType = rowType.getTypeAt(partitionFieldIndex);
+ final RowData.FieldGetter partitionFieldGetter =
+ RowData.createFieldGetter(partitionFieldType, partitionFieldIndex);
+
+ DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
+ cfg.kafkaTopic,
+ new JsonRowDataDeserializationSchema(
+ rowType,
+ new RowDataTypeInfo(rowType),
+ false,
+ true,
+ TimestampFormat.ISO_8601
+ ), kafkaProps))
+ .name("kafka_source")
+ .uid("uid_kafka_source")
+ // Key-by partition path, to avoid multiple subtasks write to a partition at the same time
+ .keyBy(partitionFieldGetter::getFieldOrNull)
+ .transform("hoodie_stream_write", null, operatorFactory)
+ .uid("uid_hoodie_stream_write")
+ .setParallelism(numWriteTask); // should make it configurable
+
+ env.addOperator(dataStream.getTransformation());
+
+ env.execute(cfg.targetTableName);
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java
similarity index 54%
copy from hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
copy to hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java
index 74b4067..9baaf0a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,29 +16,19 @@
* limitations under the License.
*/
-package org.apache.hudi.schema;
+package org.apache.hudi.streamer;
-import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.avro.Schema;
-
-import java.io.Serializable;
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.ParameterException;
/**
- * Class to provide schema for reading data and also writing into a Hoodie table.
+ * Converter that converts a string into enum WriteOperationType.
*/
-public abstract class SchemaProvider implements Serializable {
-
- protected TypedProperties config;
-
- protected SchemaProvider(TypedProperties props) {
- this.config = props;
- }
-
- public abstract Schema getSourceSchema();
-
- public Schema getTargetSchema() {
- // by default, use source schema as target for hoodie table as well
- return getSourceSchema();
+public class OperationConverter implements IStringConverter<WriteOperationType> {
+ @Override
+ public WriteOperationType convert(String value) throws ParameterException {
+ return WriteOperationType.valueOf(value);
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
new file mode 100644
index 0000000..21664f3
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+
+import java.util.List;
+
+/**
+ * Converts an Avro schema into Flink's type information. It uses {@link org.apache.flink.api.java.typeutils.RowTypeInfo} for
+ * representing objects and converts Avro types into types that are compatible with Flink's Table &
+ * SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime classes
+ * {@link org.apache.flink.formats.avro.AvroRowDeserializationSchema} and {@link org.apache.flink.formats.avro.AvroRowSerializationSchema}.
+ *
+ * <p><p>NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
+ */
+public class AvroSchemaConverter {
+
+ /**
+ * Converts an Avro schema {@code schema} into a nested row structure with deterministic field order and
+ * data types that are compatible with Flink's Table & SQL API.
+ *
+ * @param schema Avro schema definition
+ * @return data type matching the schema
+ */
+ public static DataType convertToDataType(Schema schema) {
+ switch (schema.getType()) {
+ case RECORD:
+ final List<Schema.Field> schemaFields = schema.getFields();
+
+ final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()];
+ for (int i = 0; i < schemaFields.size(); i++) {
+ final Schema.Field field = schemaFields.get(i);
+ fields[i] = DataTypes.FIELD(field.name(), convertToDataType(field.schema()));
+ }
+ return DataTypes.ROW(fields).notNull();
+ case ENUM:
+ return DataTypes.STRING().notNull();
+ case ARRAY:
+ return DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull();
+ case MAP:
+ return DataTypes.MAP(
+ DataTypes.STRING().notNull(),
+ convertToDataType(schema.getValueType()))
+ .notNull();
+ case UNION:
+ final Schema actualSchema;
+ final boolean nullable;
+ if (schema.getTypes().size() == 2
+ && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+ actualSchema = schema.getTypes().get(1);
+ nullable = true;
+ } else if (schema.getTypes().size() == 2
+ && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+ actualSchema = schema.getTypes().get(0);
+ nullable = true;
+ } else if (schema.getTypes().size() == 1) {
+ actualSchema = schema.getTypes().get(0);
+ nullable = false;
+ } else {
+ // use Kryo for serialization
+ return new AtomicDataType(
+ new TypeInformationRawType<>(false, Types.GENERIC(Object.class)));
+ }
+ DataType converted = convertToDataType(actualSchema);
+ return nullable ? converted.nullable() : converted;
+ case FIXED:
+ // logical decimal type
+ if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ final LogicalTypes.Decimal decimalType =
+ (LogicalTypes.Decimal) schema.getLogicalType();
+ return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
+ .notNull();
+ }
+ // convert fixed size binary data to primitive byte arrays
+ return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
+ case STRING:
+ // convert Avro's Utf8/CharSequence to String
+ return DataTypes.STRING().notNull();
+ case BYTES:
+ // logical decimal type
+ if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ final LogicalTypes.Decimal decimalType =
+ (LogicalTypes.Decimal) schema.getLogicalType();
+ return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
+ .notNull();
+ }
+ return DataTypes.BYTES().notNull();
+ case INT:
+ // logical date and time type
+ final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
+ if (logicalType == LogicalTypes.date()) {
+ return DataTypes.DATE().notNull();
+ } else if (logicalType == LogicalTypes.timeMillis()) {
+ return DataTypes.TIME(3).notNull();
+ }
+ return DataTypes.INT().notNull();
+ case LONG:
+ // logical timestamp type
+ if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+ return DataTypes.TIMESTAMP(3).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+ return DataTypes.TIMESTAMP(6).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
+ return DataTypes.TIME(3).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+ return DataTypes.TIME(6).notNull();
+ }
+ return DataTypes.BIGINT().notNull();
+ case FLOAT:
+ return DataTypes.FLOAT().notNull();
+ case DOUBLE:
+ return DataTypes.DOUBLE().notNull();
+ case BOOLEAN:
+ return DataTypes.BOOLEAN().notNull();
+ case NULL:
+ return DataTypes.NULL();
+ default:
+ throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
+ }
+ }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
new file mode 100644
index 0000000..d5ff663
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap;
+
+/**
+ * Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}.
+ *
+ * <p>NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
+ */
+@Internal
+public class RowDataToAvroConverters {
+
+ // --------------------------------------------------------------------------------
+ // Runtime Converters
+ // --------------------------------------------------------------------------------
+
+ /**
+ * Runtime converter that converts objects of Flink Table & SQL internal data structures to
+ * corresponding Avro data structures.
+ */
+ @FunctionalInterface
+ public interface RowDataToAvroConverter extends Serializable {
+ Object convert(Schema schema, Object object);
+ }
+
+ // --------------------------------------------------------------------------------
+ // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+ // necessary because the maven shade plugin cannot relocate classes in
+ // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Avro for
+ // sql-client uber jars.
+ // --------------------------------------------------------------------------------
+
+ /**
+ * Creates a runtime converter accroding to the given logical type that converts objects of
+ * Flink Table & SQL internal data structures to corresponding Avro data structures.
+ */
+ public static RowDataToAvroConverter createConverter(LogicalType type) {
+ final RowDataToAvroConverter converter;
+ switch (type.getTypeRoot()) {
+ case NULL:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return null;
+ }
+ };
+ break;
+ case TINYINT:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ((Byte) object).intValue();
+ }
+ };
+ break;
+ case SMALLINT:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ((Short) object).intValue();
+ }
+ };
+ break;
+ case BOOLEAN: // boolean
+ case INTEGER: // int
+ case INTERVAL_YEAR_MONTH: // long
+ case BIGINT: // long
+ case INTERVAL_DAY_TIME: // long
+ case FLOAT: // float
+ case DOUBLE: // double
+ case TIME_WITHOUT_TIME_ZONE: // int
+ case DATE: // int
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return object;
+ }
+ };
+ break;
+ case CHAR:
+ case VARCHAR:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return new Utf8(object.toString());
+ }
+ };
+ break;
+ case BINARY:
+ case VARBINARY:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ByteBuffer.wrap((byte[]) object);
+ }
+ };
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ((TimestampData) object).toInstant().toEpochMilli();
+ }
+ };
+ break;
+ case DECIMAL:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+ }
+ };
+ break;
+ case ARRAY:
+ converter = createArrayConverter((ArrayType) type);
+ break;
+ case ROW:
+ converter = createRowConverter((RowType) type);
+ break;
+ case MAP:
+ case MULTISET:
+ converter = createMapConverter(type);
+ break;
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+
+ // wrap into nullable converter
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ if (object == null) {
+ return null;
+ }
+
+ // get actual schema if it is a nullable schema
+ Schema actualSchema;
+ if (schema.getType() == Schema.Type.UNION) {
+ List<Schema> types = schema.getTypes();
+ int size = types.size();
+ if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+ actualSchema = types.get(0);
+ } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+ actualSchema = types.get(1);
+ } else {
+ throw new IllegalArgumentException(
+ "The Avro schema is not a nullable type: " + schema.toString());
+ }
+ } else {
+ actualSchema = schema;
+ }
+ return converter.convert(actualSchema, object);
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createRowConverter(RowType rowType) {
+ final RowDataToAvroConverter[] fieldConverters =
+ rowType.getChildren().stream()
+ .map(RowDataToAvroConverters::createConverter)
+ .toArray(RowDataToAvroConverter[]::new);
+ final LogicalType[] fieldTypes =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length];
+ for (int i = 0; i < fieldTypes.length; i++) {
+ fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
+ }
+ final int length = rowType.getFieldCount();
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final RowData row = (RowData) object;
+ final List<Schema.Field> fields = schema.getFields();
+ final GenericRecord record = new GenericData.Record(schema);
+ for (int i = 0; i < length; ++i) {
+ final Schema.Field schemaField = fields.get(i);
+ Object avroObject =
+ fieldConverters[i].convert(
+ schemaField.schema(), fieldGetters[i].getFieldOrNull(row));
+ record.put(i, avroObject);
+ }
+ return record;
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType) {
+ LogicalType elementType = arrayType.getElementType();
+ final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
+ final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType());
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final Schema elementSchema = schema.getElementType();
+ ArrayData arrayData = (ArrayData) object;
+ List<Object> list = new ArrayList<>();
+ for (int i = 0; i < arrayData.size(); ++i) {
+ list.add(
+ elementConverter.convert(
+ elementSchema, elementGetter.getElementOrNull(arrayData, i)));
+ }
+ return list;
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createMapConverter(LogicalType type) {
+ LogicalType valueType = extractValueTypeToAvroMap(type);
+ final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
+ final RowDataToAvroConverter valueConverter = createConverter(valueType);
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final Schema valueSchema = schema.getValueType();
+ final MapData mapData = (MapData) object;
+ final ArrayData keyArray = mapData.keyArray();
+ final ArrayData valueArray = mapData.valueArray();
+ final Map<Object, Object> map = new HashMap<>(mapData.size());
+ for (int i = 0; i < mapData.size(); ++i) {
+ final String key = keyArray.getString(i).toString();
+ final Object value =
+ valueConverter.convert(
+ valueSchema, valueGetter.getElementOrNull(valueArray, i));
+ map.put(key, value);
+ }
+ return map;
+ }
+ };
+ }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 71de651..9460ee8 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -18,47 +18,71 @@
package org.apache.hudi.util;
-import org.apache.hudi.HoodieFlinkStreamer;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.keygen.KeyGenerator;
-import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.schema.FilebasedSchemaProvider;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
+import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
+import java.util.Properties;
+/**
+ * Utilities for Flink stream read and write.
+ */
public class StreamerUtil {
- private static Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
+
+ public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
+ TypedProperties properties = getProps(config);
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers);
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.kafkaGroupId);
+ return properties;
+ }
- public static TypedProperties getProps(HoodieFlinkStreamer.Config cfg) {
+ public static TypedProperties getProps(FlinkStreamerConfig cfg) {
return readConfig(
FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
new Path(cfg.propsFilePath), cfg.configs).getConfig();
}
+ public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
+ return new FilebasedSchemaProvider(FlinkOptions.fromStreamerConfig(cfg)).getSourceSchema();
+ }
+
+ public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
+ return new FilebasedSchemaProvider(conf).getSourceSchema();
+ }
/**
- * Read conig from files.
+ * Read config from properties file (`--props` option) and cmd line (`--hoodie-conf` option).
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf;
@@ -81,16 +105,50 @@ public class StreamerUtil {
return conf;
}
- public static Configuration getHadoopConf() {
- return new Configuration();
+ public static org.apache.hadoop.conf.Configuration getHadoopConf() {
+ // create hadoop configuration with hadoop conf directory configured.
+ org.apache.hadoop.conf.Configuration hadoopConf = null;
+ for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
+ hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
+ if (hadoopConf != null) {
+ break;
+ }
+ }
+ if (hadoopConf == null) {
+ hadoopConf = new org.apache.hadoop.conf.Configuration();
+ }
+ return hadoopConf;
}
- public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
- checkPropNames.forEach(prop -> {
- if (!props.containsKey(prop)) {
- throw new HoodieNotSupportedException("Required property " + prop + " is missing");
+ /**
+ * Returns a new Hadoop Configuration object using the path to the hadoop conf configured.
+ *
+ * @param hadoopConfDir Hadoop conf directory path.
+ * @return A Hadoop configuration instance.
+ */
+ private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) {
+ if (new File(hadoopConfDir).exists()) {
+ org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+ File coreSite = new File(hadoopConfDir, "core-site.xml");
+ if (coreSite.exists()) {
+ hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath()));
+ }
+ File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
+ if (hdfsSite.exists()) {
+ hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath()));
+ }
+ File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
+ if (yarnSite.exists()) {
+ hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath()));
+ }
+ // Add mapred-site.xml. We need to read configurations like compression codec.
+ File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
+ if (mapredSite.exists()) {
+ hadoopConfiguration.addResource(new Path(mapredSite.getAbsolutePath()));
}
- });
+ return hadoopConfiguration;
+ }
+ return null;
}
/**
@@ -110,6 +168,21 @@ public class StreamerUtil {
}
/**
+ * Create a key generator class via reflection, passing in any configs needed.
+ * <p>
+ * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class
+ * specified in {@link FlinkOptions}.
+ */
+ public static KeyGenerator createKeyGenerator(Configuration conf) throws IOException {
+ String keyGeneratorClass = conf.getString(FlinkOptions.KEYGEN_CLASS);
+ try {
+ return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, flinkConf2TypedProperties(conf));
+ } catch (Throwable e) {
+ throw new IOException("Could not load key generator class " + keyGeneratorClass, e);
+ }
+ }
+
+ /**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
@@ -122,21 +195,59 @@ public class StreamerUtil {
}
}
- public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) {
- FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
+ /**
+ * Create a payload class via reflection, do not ordering/precombine value.
+ */
+ public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record)
+ throws IOException {
+ try {
+ return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
+ new Class<?>[] {Option.class}, Option.of(record));
+ } catch (Throwable e) {
+ throw new IOException("Could not create payload for class: " + payloadClass, e);
+ }
+ }
+
+ public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
HoodieWriteConfig.Builder builder =
- HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
- .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
- .build())
- .forTable(cfg.targetTableName)
+ HoodieWriteConfig.newBuilder()
+ .withEngineType(EngineType.FLINK)
+ .withPath(conf.getString(FlinkOptions.PATH))
+ .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
+ .withCompactionConfig(
+ HoodieCompactionConfig.newBuilder()
+ .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
+ .build())
+ .forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withAutoCommit(false)
- .withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
- .getConfig());
+ .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
+
+ builder = builder.withSchema(getSourceSchema(conf).toString());
+ return builder.build();
+ }
- builder = builder.withSchema(new FilebasedSchemaProvider(getProps(cfg)).getTargetSchema().toString());
- HoodieWriteConfig config = builder.build();
- return config;
+ /**
+ * Converts the give {@link Configuration} to {@link TypedProperties}.
+ * The default values are also set up.
+ *
+ * @param conf The flink configuration
+ * @return a TypedProperties instance
+ */
+ public static TypedProperties flinkConf2TypedProperties(Configuration conf) {
+ Properties properties = new Properties();
+ // put all the set up options
+ conf.addAllToProperties(properties);
+ // put all the default options
+ for (ConfigOption<?> option : FlinkOptions.OPTIONAL_OPTIONS) {
+ if (!conf.contains(option)) {
+ properties.put(option.key(), option.defaultValue());
+ }
+ }
+ return new TypedProperties(properties);
}
+ public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
+ checkPropNames.forEach(prop ->
+ Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
+ }
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
new file mode 100644
index 0000000..c2d7a65
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.operator.utils.StreamWriteFunctionWrapper;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.data.RowData;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.operator.utils.TestData.checkWrittenData;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for StreamingSinkFunction.
+ */
+public class StreamWriteFunctionTest {
+
+ private static final Map<String, String> EXPECTED = new HashMap<>();
+
+ static {
+ EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
+ EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
+ EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
+ EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+ }
+
+ private static final Map<String, String> EXPECTED2 = new HashMap<>();
+
+ static {
+ EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
+ EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
+ EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, "
+ + "id9,par3,id9,Jane,19,6,par3]");
+ EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, "
+ + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+ }
+
+ private StreamWriteFunctionWrapper<RowData> funcWrapper;
+
+ @TempDir
+ File tempFile;
+
+ @BeforeEach
+ public void before() throws Exception {
+ this.funcWrapper = new StreamWriteFunctionWrapper<>(
+ tempFile.getAbsolutePath(),
+ TestConfigurations.SERIALIZER);
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ funcWrapper.close();
+ }
+
+ @Test
+ public void testCheckpoint() throws Exception {
+ // open the function and ingest data
+ funcWrapper.openFunction();
+ for (RowData rowData : TestData.DATA_SET_ONE) {
+ funcWrapper.invoke(rowData);
+ }
+
+ // no checkpoint, so the coordinator does not accept any events
+ assertTrue(
+ funcWrapper.getEventBuffer().length == 1
+ && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
+
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(1);
+
+ String instant = funcWrapper.getWriteClient()
+ .getInflightAndRequestedInstant("COPY_ON_WRITE");
+
+ final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+ MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+ List<WriteStatus> writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses();
+ assertNotNull(writeStatuses);
+ MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files
+ assertThat(writeStatuses.stream()
+ .map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder())
+ .collect(Collectors.joining(",")),
+ is("par1,par2,par3,par4"));
+
+ funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
+ funcWrapper.checkpointComplete(1);
+ // the coordinator checkpoint commits the inflight instant.
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+
+ // checkpoint for next round, no data input, so after the checkpoint,
+ // there should not be REQUESTED Instant
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(2);
+
+ String instant2 = funcWrapper.getWriteClient()
+ .getInflightAndRequestedInstant("COPY_ON_WRITE");
+ assertNotEquals(instant, instant2);
+
+ final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
+ assertThat("The operator expect to send an event", nextEvent2, instanceOf(BatchWriteSuccessEvent.class));
+ List<WriteStatus> writeStatuses2 = ((BatchWriteSuccessEvent) nextEvent2).getWriteStatuses();
+ assertNotNull(writeStatuses2);
+ assertThat(writeStatuses2.size(), is(0)); // write empty statuses
+
+ funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+ funcWrapper.checkpointComplete(2);
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null);
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+ }
+
+ @Test
+ public void testCheckpointFails() throws Exception {
+ // open the function and ingest data
+ funcWrapper.openFunction();
+ // no data written and triggers checkpoint fails,
+ // then we should revert the start instant
+
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(1);
+
+ String instant = funcWrapper.getWriteClient()
+ .getInflightAndRequestedInstant("COPY_ON_WRITE");
+ assertNotNull(instant);
+
+ final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+ assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+ List<WriteStatus> writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses();
+ assertNotNull(writeStatuses);
+ assertThat(writeStatuses.size(), is(0)); // no data write
+
+ // fails the checkpoint
+ assertThrows(HoodieException.class,
+ () -> funcWrapper.checkpointFails(1),
+ "The last checkpoint was aborted, roll back the last write and throw");
+
+ // the instant metadata should be cleared
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null);
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null);
+
+ for (RowData rowData : TestData.DATA_SET_ONE) {
+ funcWrapper.invoke(rowData);
+ }
+
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(2);
+ // Do not sent the write event and fails the checkpoint
+ assertThrows(HoodieException.class,
+ () -> funcWrapper.checkpointFails(2),
+ "The last checkpoint was aborted, roll back the last write and throw");
+ }
+
+ @Test
+ public void testInsert() throws Exception {
+ // open the function and ingest data
+ funcWrapper.openFunction();
+ for (RowData rowData : TestData.DATA_SET_ONE) {
+ funcWrapper.invoke(rowData);
+ }
+
+ assertEmptyDataFiles();
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(1);
+
+ String instant = funcWrapper.getWriteClient()
+ .getInflightAndRequestedInstant("COPY_ON_WRITE");
+
+ final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+ assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+ checkWrittenData(tempFile, EXPECTED);
+
+ funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
+ funcWrapper.checkpointComplete(1);
+ // the coordinator checkpoint commits the inflight instant.
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+ checkWrittenData(tempFile, EXPECTED);
+ }
+
+ @Test
+ public void testUpsert() throws Exception {
+ // open the function and ingest data
+ funcWrapper.openFunction();
+ for (RowData rowData : TestData.DATA_SET_ONE) {
+ funcWrapper.invoke(rowData);
+ }
+
+ assertEmptyDataFiles();
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(1);
+
+ OperatorEvent nextEvent = funcWrapper.getNextEvent();
+ assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+
+ funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+ funcWrapper.checkpointComplete(1);
+
+ // upsert another data buffer
+ for (RowData rowData : TestData.DATA_SET_TWO) {
+ funcWrapper.invoke(rowData);
+ }
+ // the data is not flushed yet
+ checkWrittenData(tempFile, EXPECTED);
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(2);
+
+ String instant = funcWrapper.getWriteClient()
+ .getInflightAndRequestedInstant("COPY_ON_WRITE");
+
+ nextEvent = funcWrapper.getNextEvent();
+ assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+ checkWrittenData(tempFile, EXPECTED2);
+
+ funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
+ funcWrapper.checkpointComplete(2);
+ // the coordinator checkpoint commits the inflight instant.
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+ checkWrittenData(tempFile, EXPECTED2);
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ @SuppressWarnings("rawtypes")
+ private void checkInstantState(
+ HoodieFlinkWriteClient writeClient,
+ HoodieInstant.State state,
+ String instantStr) {
+ final String instant;
+ switch (state) {
+ case REQUESTED:
+ instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE");
+ break;
+ case COMPLETED:
+ instant = writeClient.getLastCompletedInstant("COPY_ON_WRITE");
+ break;
+ default:
+ throw new AssertionError("Unexpected state");
+ }
+ assertThat(instant, is(instantStr));
+ }
+
+ /**
+ * Asserts the data files are empty.
+ */
+ private void assertEmptyDataFiles() {
+ File[] dataFiles = tempFile.listFiles(file -> !file.getName().startsWith("."));
+ assertNotNull(dataFiles);
+ assertThat(dataFiles.length, is(0));
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
new file mode 100644
index 0000000..56f946b
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test for Flink Hoodie stream sink.
+ */
+public class StreamWriteITCase extends TestLogger {
+
+ private static final Map<String, String> EXPECTED = new HashMap<>();
+
+ static {
+ EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
+ EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
+ EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
+ EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
+ }
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ public void testWriteToHoodie() throws Exception {
+ Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.getConfig().disableObjectReuse();
+ execEnv.setParallelism(4);
+ // 1 second a time
+ execEnv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
+
+ // Read from kafka source
+ RowType rowType =
+ (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ .getLogicalType();
+ StreamWriteOperatorFactory<RowData> operatorFactory =
+ new StreamWriteOperatorFactory<>(rowType, conf, 4);
+
+ int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
+ final RowData.FieldGetter partitionFieldGetter =
+ RowData.createFieldGetter(rowType.getTypeAt(partitionFieldIndex), partitionFieldIndex);
+
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ rowType,
+ new RowDataTypeInfo(rowType),
+ false,
+ true,
+ TimestampFormat.ISO_8601
+ );
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_source.data")).toString();
+
+ TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
+ TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+ format.setCharsetName("UTF-8");
+
+ DataStream<Object> dataStream = execEnv
+ // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+ .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+ .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+ // Key-by partition path, to avoid multiple subtasks write to a partition at the same time
+ .keyBy(partitionFieldGetter::getFieldOrNull)
+ .transform("hoodie_stream_write", null, operatorFactory)
+ .uid("uid_hoodie_stream_write")
+ .setParallelism(4);
+ execEnv.addOperator(dataStream.getTransformation());
+
+ JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
+ if (client.getJobStatus().get() != JobStatus.FAILED) {
+ try {
+ TimeUnit.SECONDS.sleep(10);
+ client.cancel();
+ } catch (Throwable var1) {
+ // ignored
+ }
+ }
+
+ TestData.checkWrittenData(tempFile, EXPECTED);
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
new file mode 100644
index 0000000..c533b48
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for StreamingSinkOperatorCoordinator.
+ */
+public class StreamWriteOperatorCoordinatorTest {
+ private StreamWriteOperatorCoordinator coordinator;
+
+ @TempDir
+ File tempFile;
+
+ @BeforeEach
+ public void before() throws Exception {
+ coordinator = new StreamWriteOperatorCoordinator(
+ TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2);
+ coordinator.start();
+ }
+
+ @AfterEach
+ public void after() {
+ coordinator.close();
+ }
+
+ @Test
+ public void testTableInitialized() throws IOException {
+ final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
+ String basePath = tempFile.getAbsolutePath();
+ try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+ assertTrue(fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)));
+ }
+ }
+
+ @Test
+ public void testCheckpointAndRestore() throws Exception {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ coordinator.checkpointCoordinator(1, future);
+ coordinator.resetToCheckpoint(future.get());
+ }
+
+ @Test
+ public void testReceiveInvalidEvent() {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ coordinator.checkpointCoordinator(1, future);
+ OperatorEvent event = new BatchWriteSuccessEvent(0, "abc", Collections.emptyList());
+ assertThrows(IllegalStateException.class,
+ () -> coordinator.handleEventFromOperator(0, event),
+ "Receive an unexpected event for instant abc from task 0");
+ }
+
+ @Test
+ public void testCheckpointInvalid() {
+ final CompletableFuture<byte[]> future = new CompletableFuture<>();
+ coordinator.checkpointCoordinator(1, future);
+ String inflightInstant = coordinator.getInFlightInstant();
+ OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList());
+ coordinator.handleEventFromOperator(0, event);
+ final CompletableFuture<byte[]> future2 = new CompletableFuture<>();
+ coordinator.checkpointCoordinator(2, future2);
+ assertTrue(future2.isCompletedExceptionally());
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java
new file mode 100644
index 0000000..40e58fe
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+/**
+ * A {@link FunctionInitializationContext} for testing purpose.
+ */
+public class MockFunctionInitializationContext implements FunctionInitializationContext {
+
+ private final MockOperatorStateStore operatorStateStore;
+
+ public MockFunctionInitializationContext() {
+ operatorStateStore = new MockOperatorStateStore();
+ }
+
+ @Override
+ public boolean isRestored() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MockOperatorStateStore getOperatorStateStore() {
+ return operatorStateStore;
+ }
+
+ @Override
+ public KeyedStateStore getKeyedStateStore() {
+ return operatorStateStore;
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockMapState.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockMapState.java
new file mode 100644
index 0000000..9697298
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockMapState.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.state.MapState;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Mock map state for testing.
+ *
+ * @param <K> Type of state key
+ * @param <V> Type of state value
+ */
+public class MockMapState<K, V> implements MapState<K, V> {
+ private final Map<K, V> backingMap = new HashMap<>();
+
+ @Override
+ public V get(K uk) {
+ return backingMap.get(uk);
+ }
+
+ @Override
+ public void put(K uk, V uv) {
+ backingMap.put(uk, uv);
+ }
+
+ @Override
+ public void putAll(Map<K, V> map) {
+ backingMap.putAll(map);
+ }
+
+ @Override
+ public void remove(K uk) {
+ backingMap.remove(uk);
+ }
+
+ @Override
+ public boolean contains(K uk) {
+ return backingMap.containsKey(uk);
+ }
+
+ @Override
+ public Iterable<Map.Entry<K, V>> entries() {
+ return backingMap.entrySet();
+ }
+
+ @Override
+ public Iterable<K> keys() {
+ return backingMap.keySet();
+ }
+
+ @Override
+ public Iterable<V> values() {
+ return backingMap.values();
+ }
+
+ @Override
+ public Iterator<Map.Entry<K, V>> iterator() {
+ return backingMap.entrySet().iterator();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return backingMap.isEmpty();
+ }
+
+ @Override
+ public void clear() {
+ backingMap.clear();
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockOperatorStateStore.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockOperatorStateStore.java
new file mode 100644
index 0000000..016ad5b
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockOperatorStateStore.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An {@link OperatorStateStore} for testing purpose.
+ */
+@SuppressWarnings("rawtypes")
+public class MockOperatorStateStore implements KeyedStateStore, OperatorStateStore {
+
+ private final Map<Long, Map<String, TestUtils.MockListState>> historyStateMap;
+
+ private Map<String, TestUtils.MockListState> currentStateMap;
+ private Map<String, TestUtils.MockListState> lastSuccessStateMap;
+
+ private MapState mapState;
+
+ public MockOperatorStateStore() {
+ this.historyStateMap = new HashMap<>();
+
+ this.currentStateMap = new HashMap<>();
+ this.lastSuccessStateMap = new HashMap<>();
+
+ this.mapState = new MockMapState<>();
+ }
+
+ @Override
+ public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
+ return null;
+ }
+
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> valueStateDescriptor) {
+ return null;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) {
+ String name = stateDescriptor.getName();
+ currentStateMap.putIfAbsent(name, new TestUtils.MockListState());
+ return currentStateMap.get(name);
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> reducingStateDescriptor) {
+ return null;
+ }
+
+ @Override
+ public <I, A, O> AggregatingState<I, O> getAggregatingState(AggregatingStateDescriptor<I, A, O> aggregatingStateDescriptor) {
+ return null;
+ }
+
+ @Override
+ public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> foldingStateDescriptor) {
+ return null;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <K, V> MapState<K, V> getMapState(MapStateDescriptor<K, V> mapStateDescriptor) {
+ return this.mapState;
+ }
+
+ @Override
+ public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<String> getRegisteredStateNames() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<String> getRegisteredBroadcastStateNames() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void checkpointBegin(long checkpointId) {
+ Map<String, TestUtils.MockListState> copiedStates = Collections.unmodifiableMap(copyStates(currentStateMap));
+ historyStateMap.put(checkpointId, copiedStates);
+ }
+
+ public void checkpointSuccess(long checkpointId) {
+ lastSuccessStateMap = historyStateMap.get(checkpointId);
+ }
+
+ public void rollBackToLastSuccessCheckpoint() {
+ this.currentStateMap = copyStates(lastSuccessStateMap);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, TestUtils.MockListState> copyStates(Map<String, TestUtils.MockListState> stateMap) {
+ Map<String, TestUtils.MockListState> copiedStates = new HashMap<>();
+ for (Map.Entry<String, TestUtils.MockListState> entry : stateMap.entrySet()) {
+ TestUtils.MockListState copiedState = new TestUtils.MockListState();
+ copiedState.addAll(entry.getValue().getBackingList());
+ copiedStates.put(entry.getKey(), copiedState);
+ }
+ return copiedStates;
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockStreamingRuntimeContext.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockStreamingRuntimeContext.java
new file mode 100644
index 0000000..1db98df
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockStreamingRuntimeContext.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+
+import java.util.HashMap;
+
+/**
+ * Mock {@link StreamingRuntimeContext} to use in tests.
+ *
+ * <p>NOTE: Adapted from Apache Flink, the MockStreamOperator is modified to support MapState.
+ */
+public class MockStreamingRuntimeContext extends StreamingRuntimeContext {
+
+ private final boolean isCheckpointingEnabled;
+
+ private final int numParallelSubtasks;
+ private final int subtaskIndex;
+
+ public MockStreamingRuntimeContext(
+ boolean isCheckpointingEnabled,
+ int numParallelSubtasks,
+ int subtaskIndex) {
+
+ this(isCheckpointingEnabled, numParallelSubtasks, subtaskIndex, new MockEnvironmentBuilder()
+ .setTaskName("mockTask")
+ .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+ .build());
+ }
+
+ public MockStreamingRuntimeContext(
+ boolean isCheckpointingEnabled,
+ int numParallelSubtasks,
+ int subtaskIndex,
+ MockEnvironment environment) {
+
+ super(new MockStreamOperator(), environment, new HashMap<>());
+
+ this.isCheckpointingEnabled = isCheckpointingEnabled;
+ this.numParallelSubtasks = numParallelSubtasks;
+ this.subtaskIndex = subtaskIndex;
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
+
+ @Override
+ public boolean isCheckpointingEnabled() {
+ return isCheckpointingEnabled;
+ }
+
+ @Override
+ public int getIndexOfThisSubtask() {
+ return subtaskIndex;
+ }
+
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return numParallelSubtasks;
+ }
+
+ private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
+ private static final long serialVersionUID = -1153976702711944427L;
+
+ private transient TestProcessingTimeService testProcessingTimeService;
+
+ private transient MockOperatorStateStore mockOperatorStateStore;
+
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ return new ExecutionConfig();
+ }
+
+ @Override
+ public OperatorID getOperatorID() {
+ return new OperatorID();
+ }
+
+ @Override
+ public ProcessingTimeService getProcessingTimeService() {
+ if (testProcessingTimeService == null) {
+ testProcessingTimeService = new TestProcessingTimeService();
+ }
+ return testProcessingTimeService;
+ }
+
+ @Override
+ public KeyedStateStore getKeyedStateStore() {
+ if (mockOperatorStateStore == null) {
+ mockOperatorStateStore = new MockOperatorStateStore();
+ }
+ return mockOperatorStateStore;
+ }
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
new file mode 100644
index 0000000..1b02791
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.operator.StreamWriteFunction;
+import org.apache.hudi.operator.StreamWriteOperatorCoordinator;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class to manipulate the {@link StreamWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class StreamWriteFunctionWrapper<I> {
+ private final TypeSerializer<I> serializer;
+ private final Configuration conf;
+
+ private final IOManager ioManager;
+ private final StreamingRuntimeContext runtimeContext;
+ private final MockOperatorEventGateway gateway;
+ private final StreamWriteOperatorCoordinator coordinator;
+ private final MockFunctionInitializationContext functionInitializationContext;
+
+ private StreamWriteFunction<Object, I, Object> function;
+
+ public StreamWriteFunctionWrapper(String tablePath, TypeSerializer<I> serializer) throws Exception {
+ this.serializer = serializer;
+ this.ioManager = new IOManagerAsync();
+ MockEnvironment environment = new MockEnvironmentBuilder()
+ .setTaskName("mockTask")
+ .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+ .setIOManager(ioManager)
+ .build();
+ this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+ this.gateway = new MockOperatorEventGateway();
+ this.conf = TestConfigurations.getDefaultConf(tablePath);
+ // one function
+ this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
+ this.coordinator.start();
+ this.functionInitializationContext = new MockFunctionInitializationContext();
+ }
+
+ public void openFunction() throws Exception {
+ function = new StreamWriteFunction<>(TestConfigurations.ROW_TYPE, this.conf);
+ function.setRuntimeContext(runtimeContext);
+ function.setOperatorEventGateway(gateway);
+ function.open(this.conf);
+ }
+
+ public void invoke(I record) throws Exception {
+ function.processElement(record, null, null);
+ }
+
+ public BatchWriteSuccessEvent[] getEventBuffer() {
+ return this.coordinator.getEventBuffer();
+ }
+
+ public OperatorEvent getNextEvent() {
+ return this.gateway.getNextEvent();
+ }
+
+ @SuppressWarnings("rawtypes")
+ public HoodieFlinkWriteClient getWriteClient() {
+ return this.function.getWriteClient();
+ }
+
+ public void checkpointFunction(long checkpointId) throws Exception {
+ // checkpoint the coordinator first
+ this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
+ function.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+ functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+ }
+
+ public void checkpointComplete(long checkpointId) {
+ functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+ coordinator.checkpointComplete(checkpointId);
+ }
+
+ public void checkpointFails(long checkpointId) {
+ coordinator.notifyCheckpointAborted(checkpointId);
+ }
+
+ public void close() throws Exception {
+ coordinator.close();
+ ioManager.close();
+ }
+
+ public StreamWriteOperatorCoordinator getCoordinator() {
+ return coordinator;
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
new file mode 100644
index 0000000..7513fed
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.hudi.operator.FlinkOptions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Configurations for the test.
+ */
+public class TestConfigurations {
+ private TestConfigurations() {
+ }
+
+ public static final RowType ROW_TYPE = (RowType) DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
+ DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+ .notNull()
+ .getLogicalType();
+
+ public static final RowDataSerializer SERIALIZER = new RowDataSerializer(new ExecutionConfig(), ROW_TYPE);
+
+ public static Configuration getDefaultConf(String tablePath) {
+ Configuration conf = new Configuration();
+ conf.setString(FlinkOptions.PATH, tablePath);
+ conf.setString(FlinkOptions.READ_SCHEMA_FILE_PATH,
+ Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+ conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable");
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+ return conf;
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
new file mode 100644
index 0000000..7c2c314
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.hudi.common.fs.FSUtils;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.data.writer.BinaryWriter;
+import org.apache.flink.table.runtime.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.Strings;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** Data set for testing, also some utilities to check the results. */
+public class TestData {
+ public static List<RowData> DATA_SET_ONE = Arrays.asList(
+ binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+ binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
+ binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
+ binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
+ TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
+ binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20,
+ TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
+ binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+ TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
+ binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+ TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
+ );
+
+ public static List<RowData> DATA_SET_TWO = Arrays.asList(
+ // advance the age by 1
+ binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+ binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
+ binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
+ // same with before
+ binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
+ TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
+ // new data
+ binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19,
+ TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
+ binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38,
+ TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
+ binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52,
+ TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
+ );
+
+ /**
+ * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected.
+ *
+ * <p>Note: Replace it with the Flink reader when it is supported.
+ *
+ * @param baseFile The file base to check, should be a directly
+ * @param expected The expected results mapping, the key should be the partition path
+ */
+ public static void checkWrittenData(File baseFile, Map<String, String> expected) throws IOException {
+ assert baseFile.isDirectory();
+ FileFilter filter = file -> !file.getName().startsWith(".");
+ File[] partitionDirs = baseFile.listFiles(filter);
+ assertNotNull(partitionDirs);
+ assertThat(partitionDirs.length, is(4));
+ for (File partitionDir : partitionDirs) {
+ File[] dataFiles = partitionDir.listFiles(file -> file.getName().endsWith(".parquet"));
+ assertNotNull(dataFiles);
+ File latestDataFile = Arrays.stream(dataFiles)
+ .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName())))
+ .orElse(dataFiles[0]);
+ ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(new Path(latestDataFile.getAbsolutePath())).build();
+ List<String> readBuffer = new ArrayList<>();
+ GenericRecord nextRecord = reader.read();
+ while (nextRecord != null) {
+ readBuffer.add(filterOutVariables(nextRecord));
+ nextRecord = reader.read();
+ }
+ readBuffer.sort(Comparator.naturalOrder());
+ assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName())));
+ }
+ }
+
+ /**
+ * Filter out the variables like file name.
+ */
+ private static String filterOutVariables(GenericRecord genericRecord) {
+ List<String> fields = new ArrayList<>();
+ fields.add(genericRecord.get("_hoodie_record_key").toString());
+ fields.add(genericRecord.get("_hoodie_partition_path").toString());
+ fields.add(genericRecord.get("uuid").toString());
+ fields.add(genericRecord.get("name").toString());
+ fields.add(genericRecord.get("age").toString());
+ fields.add(genericRecord.get("ts").toString());
+ fields.add(genericRecord.get("partition").toString());
+ return Strings.join(fields, ",");
+ }
+
+ private static BinaryRowData binaryRow(Object... fields) {
+ LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ assertEquals(
+ "Filed count inconsistent with type information",
+ fields.length,
+ types.length);
+ BinaryRowData row = new BinaryRowData(fields.length);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.reset();
+ for (int i = 0; i < fields.length; i++) {
+ Object field = fields[i];
+ if (field == null) {
+ writer.setNullAt(i);
+ } else {
+ BinaryWriter.write(writer, i, field, types[i], InternalSerializers.create(types[i]));
+ }
+ }
+ writer.complete();
+ return row;
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java
index 98066e9..a24f153 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java
@@ -70,7 +70,7 @@ public class TestJsonStringToHoodieRecordMapFunction extends HoodieFlinkClientTe
props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, OverwriteWithLatestAvroPayload.class.getName());
props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "timestamp");
props.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
- props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionPath");
+ props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "current_date");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java b/hudi-flink/src/test/resources/test_read_schema.avsc
similarity index 53%
copy from hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
copy to hudi-flink/src/test/resources/test_read_schema.avsc
index 74b4067..0cbb4e3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
+++ b/hudi-flink/src/test/resources/test_read_schema.avsc
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,30 +15,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.hudi.schema;
-
-import org.apache.hudi.common.config.TypedProperties;
-
-import org.apache.avro.Schema;
-
-import java.io.Serializable;
-
-/**
- * Class to provide schema for reading data and also writing into a Hoodie table.
- */
-public abstract class SchemaProvider implements Serializable {
-
- protected TypedProperties config;
-
- protected SchemaProvider(TypedProperties props) {
- this.config = props;
- }
-
- public abstract Schema getSourceSchema();
-
- public Schema getTargetSchema() {
- // by default, use source schema as target for hoodie table as well
- return getSourceSchema();
- }
+{
+ "type" : "record",
+ "name" : "record",
+ "fields" : [ {
+ "name" : "uuid",
+ "type" : [ "null", "string" ],
+ "default" : null
+ }, {
+ "name" : "name",
+ "type" : [ "null", "string" ],
+ "default" : null
+ }, {
+ "name" : "age",
+ "type" : [ "null", "int" ],
+ "default" : null
+ }, {
+ "name" : "ts",
+ "type" : [ "null", {
+ "type" : "long",
+ "logicalType" : "timestamp-millis"
+ } ],
+ "default" : null
+ }, {
+ "name" : "partition",
+ "type" : [ "null", "string" ],
+ "default" : null
+ } ]
}
diff --git a/hudi-flink/src/test/resources/test_source.data b/hudi-flink/src/test/resources/test_source.data
new file mode 100644
index 0000000..2f628e2
--- /dev/null
+++ b/hudi-flink/src/test/resources/test_source.data
@@ -0,0 +1,8 @@
+{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01T00:00:01", "partition": "par1"}
+{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01T00:00:02", "partition": "par1"}
+{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01T00:00:03", "partition": "par2"}
+{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01T00:00:04", "partition": "par2"}
+{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", "partition": "par3"}
+{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01T00:00:06", "partition": "par3"}
+{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01T00:00:07", "partition": "par4"}
+{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01T00:00:08", "partition": "par4"}