You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/31 06:37:40 UTC
[inlong] branch master updated: [INLONG-7724][Sort] Add rate limit to ingest data into iceberg (#7725)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2b0d69379 [INLONG-7724][Sort] Add rate limit to ingest data into iceberg (#7725)
2b0d69379 is described below
commit 2b0d6937980931daff49db8dec99cef8b1130193
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Fri Mar 31 14:37:35 2023 +0800
[INLONG-7724][Sort] Add rate limit to ingest data into iceberg (#7725)
Co-authored-by: thexiay <xi...@gmail.com>
---
.../sort/iceberg/FlinkDynamicTableFactory.java | 8 ++++
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 27 +++++++++++-
.../sort/iceberg/sink/RateLimitMapFunction.java | 51 ++++++++++++++++++++++
3 files changed, 84 insertions(+), 2 deletions(-)
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 32d3a21bd..a4fd3cf56 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -132,6 +132,13 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
.withDescription("Distribute the records from input data stream based "
+ "on the write.distribution-mode.");
+ public static final ConfigOption<Long> WRITE_RATE_LIMIT =
+ ConfigOptions.key("write.rate.limit")
+ .longType()
+ .defaultValue(0L)
+ .withDescription("Write record rate limit per second to"
+ + " prevent traffic jitter and improve stability, default 0 (no limit)");
+
private final FlinkCatalog catalog;
public FlinkDynamicTableFactory() {
@@ -297,6 +304,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
options.add(WRITE_COMPACT_ENABLE);
options.add(WRITE_COMPACT_INTERVAL);
options.add(WRITE_DISTRIBUTION_MODE);
+ options.add(WRITE_RATE_LIMIT);
return options;
}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 9fb259b39..7524bf5f1 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -81,6 +81,7 @@ import java.util.function.Function;
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_RATE_LIMIT;
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
@@ -429,9 +430,12 @@ public class FlinkSink {
distributeDataStream(
rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
+ // Add rate limit if necessary
+ DataStream<RowData> inputWithRateLimit = appendWithRateLimit(distributeStream);
+
// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream =
- appendWriter(distributeStream, flinkRowType, equalityFieldIds);
+ appendWriter(inputWithRateLimit, flinkRowType, equalityFieldIds);
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
@@ -446,8 +450,11 @@ public class FlinkSink {
"Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
+ // Add rate limit if necessary
+ DataStream<RowData> inputWithRateLimit = appendWithRateLimit(rowDataInput);
+
// Add parallel writers that append rows to files
- SingleOutputStreamOperator<MultipleWriteResult> writerStream = appendMultipleWriter(rowDataInput);
+ SingleOutputStreamOperator<MultipleWriteResult> writerStream = appendMultipleWriter(inputWithRateLimit);
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
@@ -524,6 +531,22 @@ public class FlinkSink {
return resultStream;
}
+ private <T> DataStream<T> appendWithRateLimit(DataStream<T> input) {
+ if (tableOptions.get(WRITE_RATE_LIMIT) <= 0) {
+ return input;
+ }
+
+ SingleOutputStreamOperator<T> inputWithRateLimit = input
+ .map(new RateLimitMapFunction(tableOptions.get(WRITE_RATE_LIMIT)))
+ .name("rate_limit")
+ .setParallelism(input.getParallelism());
+
+ if (uidPrefix != null) {
+ ((SingleOutputStreamOperator) inputWithRateLimit).uid(uidPrefix + "_rate_limit");
+ }
+ return inputWithRateLimit;
+ }
+
private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
IcebergProcessOperator<WriteResult, Void> filesCommitter = new IcebergProcessOperator<>(
new IcebergSingleFileCommiter(
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RateLimitMapFunction.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RateLimitMapFunction.java
new file mode 100644
index 000000000..9d3a997c5
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RateLimitMapFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Function that limit the rate.
+ */
+public class RateLimitMapFunction<T> extends RichMapFunction<T, T> {
+
+ /**
+ * Rate limit per second for all subtasks added up.
+ */
+ private final double totalLimit;
+ private transient RateLimiter rateLimiter;
+
+ public RateLimitMapFunction(long totalLimit) {
+ this.totalLimit = totalLimit;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.rateLimiter = RateLimiter.create(
+ totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ rateLimiter.acquire();
+ return value;
+ }
+}