You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/31 06:37:40 UTC

[inlong] branch master updated: [INLONG-7724][Sort] Add rate limit to ingest data into iceberg (#7725)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b0d69379 [INLONG-7724][Sort] Add rate limit  to ingest data into iceberg (#7725)
2b0d69379 is described below

commit 2b0d6937980931daff49db8dec99cef8b1130193
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Fri Mar 31 14:37:35 2023 +0800

    [INLONG-7724][Sort] Add rate limit  to ingest data into iceberg (#7725)
    
    Co-authored-by: thexiay <xi...@gmail.com>
---
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  8 ++++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 27 +++++++++++-
 .../sort/iceberg/sink/RateLimitMapFunction.java    | 51 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 32d3a21bd..a4fd3cf56 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -132,6 +132,13 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
                     .withDescription("Distribute the records from input data stream based "
                             + "on the write.distribution-mode.");
 
+    public static final ConfigOption<Long> WRITE_RATE_LIMIT =
+            ConfigOptions.key("write.rate.limit")
+                    .longType()
+                    .defaultValue(0L)
+                    .withDescription("Write record rate limit per second to"
+                            + " prevent traffic jitter and improve stability, default 0 (no limit)");
+
     private final FlinkCatalog catalog;
 
     public FlinkDynamicTableFactory() {
@@ -297,6 +304,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         options.add(WRITE_COMPACT_ENABLE);
         options.add(WRITE_COMPACT_INTERVAL);
         options.add(WRITE_DISTRIBUTION_MODE);
+        options.add(WRITE_RATE_LIMIT);
         return options;
     }
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 9fb259b39..7524bf5f1 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -81,6 +81,7 @@ import java.util.function.Function;
 import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
 import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_RATE_LIMIT;
 
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
@@ -429,9 +430,12 @@ public class FlinkSink {
                     distributeDataStream(
                             rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
 
+            // Add rate limit if necessary
+            DataStream<RowData> inputWithRateLimit = appendWithRateLimit(distributeStream);
+
             // Add parallel writers that append rows to files
             SingleOutputStreamOperator<WriteResult> writerStream =
-                    appendWriter(distributeStream, flinkRowType, equalityFieldIds);
+                    appendWriter(inputWithRateLimit, flinkRowType, equalityFieldIds);
 
             // Add single-parallelism committer that commits files
             // after successful checkpoint or end of input
@@ -446,8 +450,11 @@ public class FlinkSink {
                     "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
             DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
 
+            // Add rate limit if necessary
+            DataStream<RowData> inputWithRateLimit = appendWithRateLimit(rowDataInput);
+
             // Add parallel writers that append rows to files
-            SingleOutputStreamOperator<MultipleWriteResult> writerStream = appendMultipleWriter(rowDataInput);
+            SingleOutputStreamOperator<MultipleWriteResult> writerStream = appendMultipleWriter(inputWithRateLimit);
 
             // Add single-parallelism committer that commits files
             // after successful checkpoint or end of input
@@ -524,6 +531,22 @@ public class FlinkSink {
             return resultStream;
         }
 
+        private <T> DataStream<T> appendWithRateLimit(DataStream<T> input) {
+            if (tableOptions.get(WRITE_RATE_LIMIT) <= 0) {
+                return input;
+            }
+
+            SingleOutputStreamOperator<T> inputWithRateLimit = input
+                    .map(new RateLimitMapFunction(tableOptions.get(WRITE_RATE_LIMIT)))
+                    .name("rate_limit")
+                    .setParallelism(input.getParallelism());
+
+            if (uidPrefix != null) {
+                ((SingleOutputStreamOperator) inputWithRateLimit).uid(uidPrefix + "_rate_limit");
+            }
+            return inputWithRateLimit;
+        }
+
         private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
             IcebergProcessOperator<WriteResult, Void> filesCommitter = new IcebergProcessOperator<>(
                     new IcebergSingleFileCommiter(
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RateLimitMapFunction.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RateLimitMapFunction.java
new file mode 100644
index 000000000..9d3a997c5
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RateLimitMapFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Function that limit the rate.
+ */
+public class RateLimitMapFunction<T> extends RichMapFunction<T, T> {
+
+    /**
+     * Rate limit per second for all subtasks added up.
+     */
+    private final double totalLimit;
+    private transient RateLimiter rateLimiter;
+
+    public RateLimitMapFunction(long totalLimit) {
+        this.totalLimit = totalLimit;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        this.rateLimiter = RateLimiter.create(
+                totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
+    }
+
+    @Override
+    public T map(T value) throws Exception {
+        rateLimiter.acquire();
+        return value;
+    }
+}