You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/03/25 01:02:11 UTC

[hudi] branch master updated: [HUDI-3701] Flink bulk_insert support bucket hash index (#5118)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e86cdd  [HUDI-3701] Flink bulk_insert support bucket hash index (#5118)
5e86cdd is described below

commit 5e86cdd1e95511424e354d99816d9495e13ae6ae
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Mar 25 09:01:42 2022 +0800

    [HUDI-3701] Flink bulk_insert support bucket hash index (#5118)
---
 .../apache/hudi/index/bucket/BucketIdentifier.java | 10 ++-
 .../sink/bucket/BucketBulkInsertWriterHelper.java  | 73 ++++++++++++++++++++++
 .../{ => bucket}/BucketStreamWriteFunction.java    |  3 +-
 .../{ => bucket}/BucketStreamWriteOperator.java    |  2 +-
 .../hudi/sink/bulk/BulkInsertWriteFunction.java    |  2 +-
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     | 18 +++---
 .../org/apache/hudi/sink/bulk/WriterHelpers.java   | 39 ++++++++++++
 .../sink/partitioner/BucketIndexPartitioner.java   |  5 +-
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 19 ++++--
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |  4 +-
 .../hudi/util/FlinkStateBackendConverter.java      |  3 +-
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 18 +++++-
 12 files changed, 169 insertions(+), 27 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index ddd9572..1a07c40 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -38,11 +38,15 @@ public class BucketIdentifier {
   }
 
   public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) {
+    return getBucketId(hoodieKey.getRecordKey(), indexKeyFields, numBuckets);
+  }
+
+  public static int getBucketId(String recordKey, String indexKeyFields, int numBuckets) {
     List<String> hashKeyFields;
-    if (!hoodieKey.getRecordKey().contains(":")) {
-      hashKeyFields = Collections.singletonList(hoodieKey.getRecordKey());
+    if (!recordKey.contains(":")) {
+      hashKeyFields = Collections.singletonList(recordKey);
     } else {
-      Map<String, String> recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(","))
+      Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(","))
           .map(p -> p.split(":"))
           .collect(Collectors.toMap(p -> p[0], p -> p[1]));
       hashKeyFields = Arrays.stream(indexKeyFields.split(","))
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
new file mode 100644
index 0000000..c52b6f0
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bucket;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Helper class for bucket index bulk insert used by Flink.
+ */
+public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper {
+  private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class);
+
+  private final int bucketNum;
+  private final String indexKeyFields;
+
+  public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
+                                      String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
+    super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
+    this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+    this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+  }
+
+  public void write(RowData record) throws IOException {
+    try {
+      String recordKey = keyGen.getRecordKey(record);
+      String partitionPath = keyGen.getPartitionPath(record);
+      final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum);
+      String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+      getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record);
+    } catch (Throwable throwable) {
+      LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable);
+      throw throwable;
+    }
+  }
+
+  private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) {
+    if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId
+      HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
+          instantTime, taskPartitionId, taskId, taskEpochId, rowType);
+      handles.put(fileId, rowCreateHandle);
+    }
+    return handles.get(fileId);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
similarity index 98%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 4c9e4dc..e53d2b2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.sink;
+package org.apache.hudi.sink.bucket;
 
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieKey;
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.table.HoodieFlinkTable;
 
 import org.apache.flink.configuration.Configuration;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
similarity index 97%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java
rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
index cf740cc..a48ea44 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.sink;
+package org.apache.hudi.sink.bucket;
 
 import org.apache.hudi.sink.common.AbstractWriteOperator;
 import org.apache.hudi.sink.common.WriteOperatorFactory;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index 9b34c3e..6c8dcef 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -167,7 +167,7 @@ public class BulkInsertWriteFunction<I>
 
   private void initWriterHelper() {
     String instant = instantToWrite();
-    this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+    this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
         instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
         this.rowType);
   }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 4bc8ae2..013595a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -50,21 +50,21 @@ public class BulkInsertWriterHelper {
 
   private static final Logger LOG = LogManager.getLogger(BulkInsertWriterHelper.class);
 
-  private final String instantTime;
-  private final int taskPartitionId;
-  private final long taskId;
-  private final long taskEpochId;
-  private final HoodieTable hoodieTable;
-  private final HoodieWriteConfig writeConfig;
-  private final RowType rowType;
+  protected final String instantTime;
+  protected final int taskPartitionId;
+  protected final long taskId;
+  protected final long taskEpochId;
+  protected final HoodieTable hoodieTable;
+  protected final HoodieWriteConfig writeConfig;
+  protected final RowType rowType;
   private final Boolean arePartitionRecordsSorted;
   private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
   private HoodieRowDataCreateHandle handle;
   private String lastKnownPartitionPath = null;
   private final String fileIdPrefix;
   private int numFilesWritten = 0;
-  private final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<>();
-  private final RowDataKeyGen keyGen;
+  protected final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<>();
+  protected final RowDataKeyGen keyGen;
 
   public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
                                 String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java
new file mode 100644
index 0000000..99a9ae1
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Factory clazz to generate bulk insert writer helpers.
+ */
+public class WriterHelpers {
+  public static BulkInsertWriterHelper getWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
+                                                       String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
+    return OptionsResolver.isBucketIndexType(conf)
+        ? new BucketBulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType)
+        : new BulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index 0c4e2a1..b9b737c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.sink.partitioner;
 
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 
 import org.apache.flink.api.common.functions.Partitioner;
@@ -29,7 +28,7 @@ import org.apache.flink.api.common.functions.Partitioner;
  *
  * @param <T> The type of obj to hash
  */
-public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<T> {
+public class BucketIndexPartitioner<T extends String> implements Partitioner<T> {
 
   private final int bucketNum;
   private final String indexKeyFields;
@@ -40,7 +39,7 @@ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<
   }
 
   @Override
-  public int partition(HoodieKey key, int numPartitions) {
+  public int partition(String key, int numPartitions) {
     int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
     return BucketIdentifier.mod(curBucket, numPartitions);
   }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 65d67fe..4930dbe 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -18,16 +18,15 @@
 
 package org.apache.hudi.sink.utils;
 
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
-import org.apache.hudi.sink.BucketStreamWriteOperator;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.sink.StreamWriteOperator;
 import org.apache.hudi.sink.append.AppendWriteOperator;
 import org.apache.hudi.sink.bootstrap.BootstrapOperator;
 import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
+import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
 import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
@@ -88,6 +87,18 @@ public class Pipelines {
    */
   public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
     WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
+    if (OptionsResolver.isBucketIndexType(conf)) {
+      int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+      String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+      BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
+      RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+      return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
+          .transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
+          .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
+          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+          .addSink(DummySink.INSTANCE)
+          .name("dummy");
+    }
 
     final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
     if (partitionFields.length > 0) {
@@ -278,8 +289,8 @@ public class Pipelines {
       WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
       int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
       String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
-      BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
-      return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
+      BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
+      return dataStream.partitionCustom(partitioner, HoodieRecord::getRecordKey)
           .transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
           .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index c5d7303..592520b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.streamer;
 
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.hudi.client.utils.OperationConverter;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -32,6 +30,8 @@ import org.apache.hudi.util.StreamerUtil;
 
 import com.beust.jcommander.Parameter;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 
 import java.util.ArrayList;
 import java.util.HashMap;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java
index b46ab14..a6b15ff 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java
@@ -18,12 +18,13 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.exception.HoodieException;
+
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.ParameterException;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
-import org.apache.hudi.exception.HoodieException;
 
 /**
  * Converter that converts a string into Flink StateBackend.
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 72c0890..1ef7157 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -906,8 +906,8 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  void testBulkInsert(boolean hiveStylePartitioning) {
+  @MethodSource("indexAndPartitioningParams")
+  void testBulkInsert(String indexType, boolean hiveStylePartitioning) {
     TableEnvironment tableEnv = batchTableEnv;
     // csv source
     String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data");
@@ -917,6 +917,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.OPERATION, "bulk_insert")
         .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true)
+        .option(FlinkOptions.INDEX_TYPE, indexType)
         .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
         .end();
     tableEnv.executeSql(hoodieTableDDL);
@@ -1262,6 +1263,19 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (index type, hive style partitioning).
+   */
+  private static Stream<Arguments> indexAndPartitioningParams() {
+    Object[][] data =
+        new Object[][] {
+            {"FLINK_STATE", false},
+            {"FLINK_STATE", true},
+            {"BUCKET", false},
+            {"BUCKET", true}};
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private void execInsertSql(TableEnvironment tEnv, String insert) {
     TableResult tableResult = tEnv.executeSql(insert);
     // wait to finish