You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/03/25 01:02:11 UTC
[hudi] branch master updated: [HUDI-3701] Flink bulk_insert support bucket hash index (#5118)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e86cdd [HUDI-3701] Flink bulk_insert support bucket hash index (#5118)
5e86cdd is described below
commit 5e86cdd1e95511424e354d99816d9495e13ae6ae
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Mar 25 09:01:42 2022 +0800
[HUDI-3701] Flink bulk_insert support bucket hash index (#5118)
---
.../apache/hudi/index/bucket/BucketIdentifier.java | 10 ++-
.../sink/bucket/BucketBulkInsertWriterHelper.java | 73 ++++++++++++++++++++++
.../{ => bucket}/BucketStreamWriteFunction.java | 3 +-
.../{ => bucket}/BucketStreamWriteOperator.java | 2 +-
.../hudi/sink/bulk/BulkInsertWriteFunction.java | 2 +-
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 18 +++---
.../org/apache/hudi/sink/bulk/WriterHelpers.java | 39 ++++++++++++
.../sink/partitioner/BucketIndexPartitioner.java | 5 +-
.../java/org/apache/hudi/sink/utils/Pipelines.java | 19 ++++--
.../apache/hudi/streamer/FlinkStreamerConfig.java | 4 +-
.../hudi/util/FlinkStateBackendConverter.java | 3 +-
.../apache/hudi/table/ITTestHoodieDataSource.java | 18 +++++-
12 files changed, 169 insertions(+), 27 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index ddd9572..1a07c40 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -38,11 +38,15 @@ public class BucketIdentifier {
}
public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) {
+ return getBucketId(hoodieKey.getRecordKey(), indexKeyFields, numBuckets);
+ }
+
+ public static int getBucketId(String recordKey, String indexKeyFields, int numBuckets) {
List<String> hashKeyFields;
- if (!hoodieKey.getRecordKey().contains(":")) {
- hashKeyFields = Collections.singletonList(hoodieKey.getRecordKey());
+ if (!recordKey.contains(":")) {
+ hashKeyFields = Collections.singletonList(recordKey);
} else {
- Map<String, String> recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(","))
+ Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(","))
.map(p -> p.split(":"))
.collect(Collectors.toMap(p -> p[0], p -> p[1]));
hashKeyFields = Arrays.stream(indexKeyFields.split(","))
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
new file mode 100644
index 0000000..c52b6f0
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bucket;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Helper class for bucket index bulk insert used by Flink.
+ */
+public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class);
+
+ private final int bucketNum;
+ private final String indexKeyFields;
+
+ public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
+ String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
+ super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
+ this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+ this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+ }
+
+ public void write(RowData record) throws IOException {
+ try {
+ String recordKey = keyGen.getRecordKey(record);
+ String partitionPath = keyGen.getPartitionPath(record);
+ final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum);
+ String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+ getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record);
+ } catch (Throwable throwable) {
+ LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable);
+ throw throwable;
+ }
+ }
+
+ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) {
+ if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId
+ HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
+ instantTime, taskPartitionId, taskId, taskEpochId, rowType);
+ handles.put(fileId, rowCreateHandle);
+ }
+ return handles.get(fileId);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
similarity index 98%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 4c9e4dc..e53d2b2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.sink;
+package org.apache.hudi.sink.bucket;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.configuration.Configuration;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
similarity index 97%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java
rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
index cf740cc..a48ea44 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.sink;
+package org.apache.hudi.sink.bucket;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index 9b34c3e..6c8dcef 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -167,7 +167,7 @@ public class BulkInsertWriteFunction<I>
private void initWriterHelper() {
String instant = instantToWrite();
- this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+ this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 4bc8ae2..013595a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -50,21 +50,21 @@ public class BulkInsertWriterHelper {
private static final Logger LOG = LogManager.getLogger(BulkInsertWriterHelper.class);
- private final String instantTime;
- private final int taskPartitionId;
- private final long taskId;
- private final long taskEpochId;
- private final HoodieTable hoodieTable;
- private final HoodieWriteConfig writeConfig;
- private final RowType rowType;
+ protected final String instantTime;
+ protected final int taskPartitionId;
+ protected final long taskId;
+ protected final long taskEpochId;
+ protected final HoodieTable hoodieTable;
+ protected final HoodieWriteConfig writeConfig;
+ protected final RowType rowType;
private final Boolean arePartitionRecordsSorted;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
private HoodieRowDataCreateHandle handle;
private String lastKnownPartitionPath = null;
private final String fileIdPrefix;
private int numFilesWritten = 0;
- private final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<>();
- private final RowDataKeyGen keyGen;
+ protected final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<>();
+ protected final RowDataKeyGen keyGen;
public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java
new file mode 100644
index 0000000..99a9ae1
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Factory clazz to generate bulk insert writer helpers.
+ */
+public class WriterHelpers {
+ public static BulkInsertWriterHelper getWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
+ String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
+ return OptionsResolver.isBucketIndexType(conf)
+ ? new BucketBulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType)
+ : new BulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index 0c4e2a1..b9b737c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -18,7 +18,6 @@
package org.apache.hudi.sink.partitioner;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.flink.api.common.functions.Partitioner;
@@ -29,7 +28,7 @@ import org.apache.flink.api.common.functions.Partitioner;
*
* @param <T> The type of obj to hash
*/
-public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<T> {
+public class BucketIndexPartitioner<T extends String> implements Partitioner<T> {
private final int bucketNum;
private final String indexKeyFields;
@@ -40,7 +39,7 @@ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<
}
@Override
- public int partition(HoodieKey key, int numPartitions) {
+ public int partition(String key, int numPartitions) {
int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
return BucketIdentifier.mod(curBucket, numPartitions);
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 65d67fe..4930dbe 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -18,16 +18,15 @@
package org.apache.hudi.sink.utils;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
-import org.apache.hudi.sink.BucketStreamWriteOperator;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
+import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
@@ -88,6 +87,18 @@ public class Pipelines {
*/
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
+ if (OptionsResolver.isBucketIndexType(conf)) {
+ int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+ String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+ BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
+ RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+ return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
+ .transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
+ .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+ .addSink(DummySink.INSTANCE)
+ .name("dummy");
+ }
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
if (partitionFields.length > 0) {
@@ -278,8 +289,8 @@ public class Pipelines {
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
- BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
- return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
+ BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
+ return dataStream.partitionCustom(partitioner, HoodieRecord::getRecordKey)
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index c5d7303..592520b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -18,8 +18,6 @@
package org.apache.hudi.streamer;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -32,6 +30,8 @@ import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import java.util.ArrayList;
import java.util.HashMap;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java
index b46ab14..a6b15ff 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java
@@ -18,12 +18,13 @@
package org.apache.hudi.util;
+import org.apache.hudi.exception.HoodieException;
+
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.ParameterException;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
-import org.apache.hudi.exception.HoodieException;
/**
* Converter that converts a string into Flink StateBackend.
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 72c0890..1ef7157 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -906,8 +906,8 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testBulkInsert(boolean hiveStylePartitioning) {
+ @MethodSource("indexAndPartitioningParams")
+ void testBulkInsert(String indexType, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = batchTableEnv;
// csv source
String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data");
@@ -917,6 +917,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "bulk_insert")
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true)
+ .option(FlinkOptions.INDEX_TYPE, indexType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -1262,6 +1263,19 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
return Stream.of(data).map(Arguments::of);
}
+ /**
+ * Return test params => (index type, hive style partitioning).
+ */
+ private static Stream<Arguments> indexAndPartitioningParams() {
+ Object[][] data =
+ new Object[][] {
+ {"FLINK_STATE", false},
+ {"FLINK_STATE", true},
+ {"BUCKET", false},
+ {"BUCKET", true}};
+ return Stream.of(data).map(Arguments::of);
+ }
+
private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish