You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by wa...@apache.org on 2021/06/08 05:55:47 UTC
[hudi] branch master updated: add BootstrapFunction to support
index bootstrap (#3024)
This is an automated email from the ASF dual-hosted git repository.
wangxianghu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cf83f10 add BootstrapFunction to support index bootstrap (#3024)
cf83f10 is described below
commit cf83f10f5b369e1f16e63b4b68750d3ad2dc0240
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Tue Jun 8 13:55:25 2021 +0800
add BootstrapFunction to support index bootstrap (#3024)
Co-authored-by: 喻兆靖 <yu...@bilibili.com>
---
.../apache/hudi/configuration/FlinkOptions.java | 6 +
.../hudi/sink/bootstrap/BootstrapFunction.java | 235 +++++++++++++++++++++
.../hudi/sink/bootstrap/BootstrapRecord.java | 33 +++
.../bootstrap/aggregate/BootstrapAccumulator.java | 53 +++++
.../sink/bootstrap/aggregate/BootstrapAggFunc.java | 49 +++++
.../sink/partitioner/BucketAssignFunction.java | 102 ++-------
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 21 +-
.../org/apache/hudi/table/HoodieTableSink.java | 21 +-
.../org/apache/hudi/sink/StreamWriteITCase.java | 28 ++-
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 28 ++-
.../sink/utils/StreamWriteFunctionWrapper.java | 44 ++++
11 files changed, 507 insertions(+), 113 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 7466b41..57fd501 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -93,6 +93,12 @@ public class FlinkOptions {
.withDescription("Whether to update index for the old partition path\n"
+ "if same key record with different partition path came in, default false");
+ public static final ConfigOption<String> INDEX_PARTITION_REGEX = ConfigOptions
+ .key("index.partition.regex")
+ .stringType()
+ .defaultValue(".*")
+ .withDescription("Whether to load partitions in state if partition path matching, default *");
+
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
new file mode 100644
index 0000000..5f81559
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunc;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/**
+ * The function to load index from exists hoodieTable.
+ *
+ * <p>Each subtask in bootstrapFunction triggers the bootstrap index with the first element,
+ * Received record cannot be sent until the index is all sent.
+ *
+ * <p>The output records should then shuffle by the recordKey and thus do scalable write.
+ *
+ * @see BootstrapFunction
+ */
+public class BootstrapFunction<I, O extends HoodieRecord>
+ extends ProcessFunction<I, O>
+ implements CheckpointedFunction, CheckpointListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
+
+ private HoodieTable<?, ?, ?, ?> hoodieTable;
+
+ private final Configuration conf;
+
+ private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+ private GlobalAggregateManager aggregateManager;
+ private ListState<Boolean> bootstrapState;
+
+ private final Pattern pattern;
+ private boolean alreadyBootstrap;
+
+ public BootstrapFunction(Configuration conf) {
+ this.conf = conf;
+ this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX));
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.bootstrapState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "bootstrap-state",
+ TypeInformation.of(new TypeHint<Boolean>() {})
+ )
+ );
+
+ if (context.isRestored()) {
+ LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+ for (Boolean alreadyBootstrap : bootstrapState.get()) {
+ this.alreadyBootstrap = alreadyBootstrap;
+ }
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.hadoopConf = StreamerUtil.getHadoopConf();
+ this.hoodieTable = getTable();
+ this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
+ if (!alreadyBootstrap) {
+ LOG.info("Start loading records in table {} into the index state, taskId = {}", conf.getString(FlinkOptions.PATH), getRuntimeContext().getIndexOfThisSubtask());
+ String basePath = hoodieTable.getMetaClient().getBasePath();
+ for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
+ if (pattern.matcher(partitionPath).matches()) {
+ loadRecords(partitionPath, out);
+ }
+ }
+
+ // wait for others bootstrap task send bootstrap complete.
+ updateAndWaiting();
+
+ alreadyBootstrap = true;
+ LOG.info("Finish send index to BucketAssign, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
+ }
+
+ // send data to next operator
+ out.collect((O) value);
+ }
+
+ /**
+ * Wait for other bootstrap task send bootstrap complete.
+ */
+ private void updateAndWaiting() {
+ int taskNum = getRuntimeContext().getNumberOfParallelSubtasks();
+ int readyTaskNum = 1;
+ while (taskNum != readyTaskNum) {
+ try {
+ readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunc.NAME, getRuntimeContext().getIndexOfThisSubtask(), new BootstrapAggFunc());
+ LOG.info("Waiting for others bootstrap task complete, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
+
+ TimeUnit.SECONDS.sleep(5);
+ } catch (Exception e) {
+ LOG.warn("update global aggregate error", e);
+ }
+ }
+ }
+
+ private HoodieFlinkTable getTable() {
+ HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+ HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(this.hadoopConf),
+ new FlinkTaskContextSupplier(getRuntimeContext()));
+ return HoodieFlinkTable.create(writeConfig, context);
+ }
+
+ /**
+ * Load all the indices of give partition path into the backup state.
+ *
+ * @param partitionPath The partition path
+ */
+ @SuppressWarnings("unchecked")
+ private void loadRecords(String partitionPath, Collector<O> out) {
+ long start = System.currentTimeMillis();
+ BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
+ List<HoodieBaseFile> latestBaseFiles =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable);
+ LOG.info("All baseFile in partition {} size = {}", partitionPath, latestBaseFiles.size());
+
+ final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+ final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+ final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+ for (HoodieBaseFile baseFile : latestBaseFiles) {
+ boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
+ baseFile.getFileId(), maxParallelism, parallelism) == taskID;
+
+ if (shouldLoad) {
+ LOG.info("Load records from file {}.", baseFile);
+ final List<HoodieKey> hoodieKeys;
+ try {
+ hoodieKeys =
+ fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
+ } catch (Exception e) {
+ throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
+ }
+
+ for (HoodieKey hoodieKey : hoodieKeys) {
+ out.collect((O) new BootstrapRecord(generateHoodieRecord(hoodieKey, baseFile)));
+ }
+ }
+ }
+
+ long cost = System.currentTimeMillis() - start;
+ LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.",
+ this.getClass().getSimpleName(), taskID, partitionPath, cost);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, HoodieBaseFile baseFile) {
+ HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null);
+ hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId()));
+ hoodieRecord.seal();
+
+ return hoodieRecord;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // no operation
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ this.bootstrapState.add(alreadyBootstrap);
+ }
+
+ @VisibleForTesting
+ public boolean isAlreadyBootstrap() {
+ return alreadyBootstrap;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java
new file mode 100644
index 0000000..025d844
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+/**
+ * An record to mark HoodieRecord or IndexRecord.
+ */
+public class BootstrapRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> {
+ private static final long serialVersionUID = 1L;
+
+ public BootstrapRecord(HoodieRecord<T> record) {
+ super(record);
+ }
+}
\ No newline at end of file
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
new file mode 100644
index 0000000..80067f0
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap.aggregate;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Aggregate accumulator.
+ */
+public class BootstrapAccumulator implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Set<Integer> readyTaskSet;
+
+ public BootstrapAccumulator() {
+ this.readyTaskSet = new HashSet<>();
+ }
+
+ public void update(int taskId) {
+ readyTaskSet.add(taskId);
+ }
+
+ public int readyTaskNum() {
+ return readyTaskSet.size();
+ }
+
+ public BootstrapAccumulator merge(BootstrapAccumulator acc) {
+ if (acc == null) {
+ return this;
+ }
+
+ readyTaskSet.addAll(acc.readyTaskSet);
+ return this;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java
new file mode 100644
index 0000000..2233e84
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap.aggregate;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+/**
+ * Aggregate Function that accumulates the loaded task number of function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}.
+ */
+public class BootstrapAggFunc implements AggregateFunction<Integer, BootstrapAccumulator, Integer> {
+ public static final String NAME = BootstrapAggFunc.class.getSimpleName();
+
+ @Override
+ public BootstrapAccumulator createAccumulator() {
+ return new BootstrapAccumulator();
+ }
+
+ @Override
+ public BootstrapAccumulator add(Integer taskId, BootstrapAccumulator bootstrapAccumulator) {
+ bootstrapAccumulator.update(taskId);
+ return bootstrapAccumulator;
+ }
+
+ @Override
+ public Integer getResult(BootstrapAccumulator bootstrapAccumulator) {
+ return bootstrapAccumulator.readyTaskNum();
+ }
+
+ @Override
+ public BootstrapAccumulator merge(BootstrapAccumulator bootstrapAccumulator, BootstrapAccumulator acc) {
+ return bootstrapAccumulator.merge(acc);
+ }
+}
\ No newline at end of file
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 36014a8..00a8274 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -22,43 +22,32 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.BaseAvroPayload;
-import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.sink.bootstrap.BootstrapRecord;
import org.apache.hudi.sink.utils.PayloadCreation;
-import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.Objects;
/**
@@ -80,8 +69,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
extends KeyedProcessFunction<K, I, O>
implements CheckpointedFunction, CheckpointListener {
- private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class);
-
private BucketAssignOperator.Context context;
/**
@@ -108,13 +95,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
private final boolean isChangingRecords;
- private final boolean bootstrapIndex;
-
- /**
- * State to book-keep which partition is loaded into the index state {@code indexState}.
- */
- private MapState<String, Integer> partitionLoadState;
-
/**
* Used to create DELETE payload.
*/
@@ -130,7 +110,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
- this.bootstrapIndex = conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
}
@@ -168,31 +147,29 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
}
indexState = context.getKeyedStateStore().getState(indexStateDesc);
- if (bootstrapIndex) {
- MapStateDescriptor<String, Integer> partitionLoadStateDesc =
- new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
- partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
- }
}
- @SuppressWarnings("unchecked")
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
+ if (value instanceof BootstrapRecord) {
+ BootstrapRecord bootstrapRecord = (BootstrapRecord) value;
+ this.context.setCurrentKey(bootstrapRecord.getRecordKey());
+ this.indexState.update((HoodieRecordGlobalLocation) bootstrapRecord.getCurrentLocation());
+ } else {
+ processRecord((HoodieRecord<?>) value, out);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
// 1. put the record into the BucketAssigner;
// 2. look up the state for location, if the record has a location, just send it out;
// 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
- HoodieRecord<?> record = (HoodieRecord<?>) value;
final HoodieKey hoodieKey = record.getKey();
final String recordKey = hoodieKey.getRecordKey();
final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
- // The dataset may be huge, thus the processing would block for long,
- // disabled by default.
- if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) {
- // If the partition records are never loaded, load the records first.
- loadRecords(partitionPath, recordKey);
- }
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
HoodieRecordGlobalLocation oldLoc = indexState.value();
@@ -216,6 +193,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
}
} else {
location = getNewRecordLocation(partitionPath);
+ this.context.setCurrentKey(recordKey);
if (isChangingRecords) {
updateIndexState(partitionPath, location);
}
@@ -264,62 +242,8 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
this.context = context;
}
- /**
- * Load all the indices of give partition path into the backup state.
- *
- * @param partitionPath The partition path
- * @param curRecordKey The current record key
- * @throws Exception when error occurs for state update
- */
- private void loadRecords(String partitionPath, String curRecordKey) throws Exception {
- LOG.info("Start loading records under partition {} into the index state", partitionPath);
- HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
- BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat());
- List<HoodieBaseFile> latestBaseFiles =
- HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
- final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
- final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
- final int taskID = getRuntimeContext().getIndexOfThisSubtask();
- for (HoodieBaseFile baseFile : latestBaseFiles) {
- final List<HoodieKey> hoodieKeys;
- try {
- hoodieKeys =
- fileUtils.fetchRecordKeyPartitionPath(hadoopConf, new Path(baseFile.getPath()));
- } catch (Exception e) {
- // in case there was some empty parquet file when the pipeline
- // crushes exceptionally.
- LOG.error("Error when loading record keys from file: {}", baseFile);
- continue;
- }
- hoodieKeys.forEach(hoodieKey -> {
- try {
- // Reference: org.apache.flink.streaming.api.datastream.KeyedStream,
- // the input records is shuffled by record key
- boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
- hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
- if (shouldLoad) {
- this.context.setCurrentKey(hoodieKey.getRecordKey());
- this.indexState.update(
- new HoodieRecordGlobalLocation(
- hoodieKey.getPartitionPath(),
- baseFile.getCommitTime(),
- baseFile.getFileId()));
- }
- } catch (Exception e) {
- LOG.error("Error when putting record keys into the state from file: {}", baseFile);
- }
- });
- }
- // recover the currentKey
- this.context.setCurrentKey(curRecordKey);
- // Mark the partition path as loaded.
- partitionLoadState.put(partitionPath, 0);
- LOG.info("Finish loading records under partition {} into the index state", partitionPath);
- }
-
@VisibleForTesting
public void clearIndexState() {
- this.partitionLoadState.clear();
this.indexState.clear();
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index b0f7ada..9e77e73 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -22,11 +22,12 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
-import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactFunction;
-import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -50,6 +52,8 @@ import java.util.Properties;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation.
+ *
+ * note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from.
*/
public class HoodieFlinkStreamer {
public static void main(String[] args) throws Exception {
@@ -82,7 +86,7 @@ public class HoodieFlinkStreamer {
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
- DataStream<Object> pipeline = env.addSource(new FlinkKafkaConsumer<>(
+ DataStream<HoodieRecord> hoodieDataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
@@ -93,8 +97,15 @@ public class HoodieFlinkStreamer {
), kafkaProps))
.name("kafka_source")
.uid("uid_kafka_source")
- .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
- // Key-by record key, to avoid multiple subtasks write to a partition at the same time
+ .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+ TypeInformation.of(HoodieRecord.class),
+ new ProcessOperator<>(new BootstrapFunction<>(conf)));
+ }
+
+ DataStream<Object> pipeline = hoodieDataStream
+ // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 02e76aa..0d8f7f2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapRecord;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -38,6 +40,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -74,10 +77,22 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
- DataStream<Object> pipeline = dataStream
- .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
+ DataStream<HoodieRecord> hoodieDataStream = dataStream
+ .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
+
+ // TODO: This is a very time-consuming operation, will optimization
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+ TypeInformation.of(HoodieRecord.class),
+ new ProcessOperator<>(new BootstrapFunction<>(conf)));
+ }
+
+ DataStream<Object> pipeline = hoodieDataStream
+ .transform("index_bootstrap",
+ TypeInformation.of(BootstrapRecord.class),
+ new ProcessOperator<>(new BootstrapFunction<>(conf)))
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
- .keyBy(HoodieRecord::getRecordKey)
+ .keyBy(BootstrapRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 0590a23..ad9c9dc 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -49,6 +50,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -109,14 +111,22 @@ public class StreamWriteITCase extends TestLogger {
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
- DataStream<Object> dataStream = execEnv
+ DataStream<HoodieRecord> hoodieDataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
- .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
+ .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
+
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+ TypeInformation.of(HoodieRecord.class),
+ new ProcessOperator<>(new BootstrapFunction<>(conf)));
+ }
+
+ DataStream<Object> pipeline = hoodieDataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
@@ -128,7 +138,7 @@ public class StreamWriteITCase extends TestLogger {
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write");
- execEnv.addOperator(dataStream.getTransformation());
+ execEnv.addOperator(pipeline.getTransformation());
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
// wait for the streaming job to finish
@@ -171,12 +181,20 @@ public class StreamWriteITCase extends TestLogger {
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
- DataStream<Object> pipeline = execEnv
+ DataStream<HoodieRecord> hoodieDataStream = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
- .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
+ .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
+
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+ TypeInformation.of(HoodieRecord.class),
+ new ProcessOperator<>(new BootstrapFunction<>(conf)));
+ }
+
+ DataStream<Object> pipeline = hoodieDataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 247e17e..5e68b76 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -576,10 +576,6 @@ public class TestWriteCopyOnWrite {
@Test
public void testIndexStateBootstrap() throws Exception {
- // reset the config option
- conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT) {
@@ -598,13 +594,21 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointComplete(1);
- // Mark the index state as not fully loaded to trigger re-load from the filesystem.
- funcWrapper.clearIndexState();
+ // the data is not flushed yet
+ checkWrittenData(tempFile, EXPECTED1);
+
+ // reset the config option
+ conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+ funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// upsert another data buffer
+ funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
funcWrapper.invoke(rowData);
}
+
+ assertTrue(funcWrapper.isAlreadyBootstrap());
+
checkIndexLoaded(
new HoodieKey("id1", "par1"),
new HoodieKey("id2", "par1"),
@@ -613,11 +617,13 @@ public class TestWriteCopyOnWrite {
new HoodieKey("id5", "par3"),
new HoodieKey("id6", "par3"),
new HoodieKey("id7", "par4"),
- new HoodieKey("id8", "par4"));
- // the data is not flushed yet
- checkWrittenData(tempFile, EXPECTED1);
+ new HoodieKey("id8", "par4"),
+ new HoodieKey("id9", "par3"),
+ new HoodieKey("id10", "par4"),
+ new HoodieKey("id11", "par4"));
+
// this triggers the data write and event send
- funcWrapper.checkpointFunction(2);
+ funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
@@ -631,7 +637,7 @@ public class TestWriteCopyOnWrite {
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
- funcWrapper.checkpointComplete(2);
+ funcWrapper.checkpointComplete(1);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED2);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 74e43ba..705780c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapRecord;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
@@ -46,6 +48,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import java.util.HashSet;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -68,6 +71,8 @@ public class StreamWriteFunctionWrapper<I> {
/** Function that converts row data to HoodieRecord. */
private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
+ /** Function that load index in state. */
+ private BootstrapFunction<HoodieRecord<?>, HoodieRecord<?>> bootstrapFunction;
/** Function that assigns bucket ID. */
private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
/** BucketAssignOperator context. **/
@@ -94,6 +99,8 @@ public class StreamWriteFunctionWrapper<I> {
// one function
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
+ this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
+ this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
this.functionInitializationContext = new MockFunctionInitializationContext();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
@@ -106,6 +113,13 @@ public class StreamWriteFunctionWrapper<I> {
toHoodieFunction.setRuntimeContext(runtimeContext);
toHoodieFunction.open(conf);
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ bootstrapFunction = new BootstrapFunction<>(conf);
+ bootstrapFunction.setRuntimeContext(runtimeContext);
+ bootstrapFunction.initializeState(this.functionInitializationContext);
+ bootstrapFunction.open(conf);
+ }
+
bucketAssignerFunction = new BucketAssignFunction<>(conf);
bucketAssignerFunction.setRuntimeContext(runtimeContext);
bucketAssignerFunction.open(conf);
@@ -136,6 +150,32 @@ public class StreamWriteFunctionWrapper<I> {
}
};
+
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ List<HoodieRecord<?>> bootstrapRecords = new ArrayList<>();
+
+ Collector<HoodieRecord<?>> bootstrapCollector = new Collector<HoodieRecord<?>>() {
+ @Override
+ public void collect(HoodieRecord<?> record) {
+ if (record instanceof BootstrapRecord) {
+ bootstrapRecords.add(record);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+
+ bootstrapFunction.processElement(hoodieRecord, null, bootstrapCollector);
+ for (HoodieRecord bootstrapRecord : bootstrapRecords) {
+ bucketAssignerFunction.processElement(bootstrapRecord, null, collector);
+ }
+
+ this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey());
+ }
+
bucketAssignerFunction.processElement(hoodieRecord, null, collector);
writeFunction.processElement(hoodieRecords[0], null, null);
}
@@ -210,6 +250,10 @@ public class StreamWriteFunctionWrapper<I> {
return this.writeFunction.isConfirming();
}
+ public boolean isAlreadyBootstrap() {
+ return this.bootstrapFunction.isAlreadyBootstrap();
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------