You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2023/01/19 11:54:34 UTC

[GitHub] [inlong] featzhang opened a new pull request, #7059: [INLONG-7058][Sort] Support Apache Kudu connector

featzhang opened a new pull request, #7059:
URL: https://github.com/apache/inlong/pull/7059

   ## Prepare a Pull Request
   *[INLONG-7058][Sort] Support Apache kudu connector*
   
   - Fixes #7058 
   
   ### Motivation
   
   
   ```sql
   CREATE TABLE kudu_tbl(
   uid int,
   fee double,
   event_id bigint,
   remark varchar
   )
   WITH(
    'connector'='kudu-inlong',
    'connector.masters'='localhost:8081;localhost:8080',
    'connector.table'='actual_tbl',
    'flush-mode'='AUTO_FLUSH_SYNC',
    'lookup.max-cache-size' = '-1',
    'lookup.max-cache-time' = '60s',
    'sink.start-new-chain' = 'true',
    'max-retries' = '3',
    'sink.max-buffer-size' = '100',
    'sink.write-thread-count' = '5', 
    'sink.max-buffer-time' = '30s',
    'sink.write-with-async-mode' = 'false',
    'sink.write-with-upsert-mode' = 'true',
    'sink.cache-queue-max-length' = '-1',
    'ignore.all.changelog' = 'false'
   );
   ```
   
   The `Apache Kudu` `sort-connector` is fully integrated with Flink 1.13.x's Table and SQL API. By configuring `KuduTableInfo` (`connector.masters`, `connector.table` and other attributes), the kudu table can be queried and inserted by Flink SQL.
   
   
   ### Modifications
   
   
   #### DDL options
   
   | Option | Default value| Required | Remarks |
   | ---- | ---- | ---- | ---- |
   | `connector.masters` | null  |   true | The master addresses of kudu server, like 'localhost:8080,localhost:8081' |
   | `connector.table` | null  |   true | The table name of kudu |
   | `flush-mode` | AUTO_FLUSH_SYNC  |   true | The flush mode of Kudu client session, AUTO_FLUSH_SYNC/AUTO_FLUSH_BACKGROUND/MANUAL_FLUSH. |
   | `lookup.max-cache-size` | -1  |   true | The maximum number of results cached in the lookup source. |
   | `lookup.max-cache-time` | 60s  |   true | The maximum live time for cached results in the lookup source. |
   | `sink.start-new-chain` | true  |   true | The sink operator will start a new chain if true. |
   | `max-retries` | 3  |   true | The maximum number of retries when an exception is caught. |
   | `sink.max-buffer-size` | 100  |   true | The maximum number of records buffered in the sink. |
   | `sink.write-thread-count` | 5  |   true | The maximum number of thread in the sink. |
   | `sink.max-buffer-time` | 30s  |   true | The maximum wait time for buffered records in the sink. |
   | `sink.write-with-async-mode` | false  |   true | Use async write mode for kudu producer if true. |
   | `sink.write-with-upsert-mode` | true  |   true | Force write kudu with upsert mode if true.  This option is used for upsert using the primary key of the kudu table |
   | `sink.cache-queue-max-length` | -1  |   true | The maximum queue lengths. |
   | `ignore.all.changelog` | false   |  true | Whether ignore delete/update_before/update_after message. If true, all message will insert|
   
   #### Data type
   
   | Flink/SQL | Kudu data type |
   | --- | --- |
   | STRING | STRING |
   | BOOLEAN | BOOL |
   | TINYINT | INT8 |
   | SMALLINT | INT16 |
   | INT | INT32 |
   | BIGINT | INT64 |
   | FLOAT | FLOAT |
   | DOUBLE | DOUBLE |
   | BYTES | BINARY |
   | TIMESTAMP(3) | UNIXTIME_MICROS |
   
   <img width="559" alt="image" src="https://user-images.githubusercontent.com/5709212/209534355-f904afe9-ffc2-4458-b921-270e49ac1c08.png">
   
   
   <br class="Apple-interchange-newline">
   
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1089868895


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduOptions.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * The configuration options for kudu sink.
+ */
+public class KuduOptions {
+
+    public static final ConfigOption<String> CONNECTOR_TABLE =
+            ConfigOptions.key("table")
+                    .stringType()
+                    .noDefaultValue().withDescription("The name of kudu table.");
+
+    public static final ConfigOption<String> CONNECTOR_MASTERS =

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang closed pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
featzhang closed pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
URL: https://github.com/apache/inlong/pull/7059


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058093963


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.kudu.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.inlong.sort.base.Constants.*;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.*;
+
+/**
+ * The base for all kudu sinks.
+ */
+@PublicEvolving
+public abstract class AbstractKuduSinkFunction
+        extends
+            RichSinkFunction<RowData>
+        implements
+            CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduSinkFunction.class);
+
+    /**
+     * The masters of kudu server.
+     */
+    protected final String masters;
+    /**
+     * The name of kudu table.
+     */
+    protected final String tableName;
+    /**
+     * The flink TableSchema.
+     */
+    protected final TableSchema flinkTableSchema;
+
+    protected String connectorMetricIdentify;
+
+    /**
+     * The configuration of kudu sinkFunction.
+     */
+    protected final Configuration configuration;
+
+    /**
+     * The maximum number of buffered records.
+     */
+    protected final int maxBufferSize;
+
+    /**
+     * The maximum number of retries.
+     */
+    protected final int maxRetries;
+
+    /**
+     * True if the sink is running.
+     */
+    protected volatile boolean running;
+
+    /**
+     * The exception thrown in asynchronous tasks.
+     */
+    private transient Throwable flushThrowable;
+
+    protected final SessionConfiguration.FlushMode flushMode;
+    /**
+     * whether load metadata in `open` function
+     */
+    protected boolean lazyLoadSchema;
+
+    private SinkMetricData sinkMetricData;
+
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+
+    private final String auditHostAndPorts;
+
+    private final String inlongMetric;
+
+    public AbstractKuduSinkFunction(
+            TableSchema flinkTableSchema,
+            String masters,
+            String tableName,
+            SessionConfiguration.FlushMode flushMode,
+            Configuration configuration,
+            String inlongMetric,
+            String auditHostAndPorts) {
+        this.masters = masters;
+        this.flushMode = flushMode;
+        this.tableName = tableName;
+        this.flinkTableSchema = flinkTableSchema;
+        this.configuration = configuration;
+        this.maxRetries = configuration.getInteger(MAX_RETRIES);
+        this.maxBufferSize = configuration.getInteger(MAX_BUFFER_SIZE);
+        this.lazyLoadSchema = configuration.getBoolean(LAZY_LOAD_SCHEMA);
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        this.running = true;
+    }
+
+    @Override
+    public void invoke(
+            RowData row,
+            Context context) throws Exception {
+        addBatch(row);
+        sendMetrics(row.toString().getBytes());
+    }
+
+    /**
+     * Adds a record in the buffer.
+     */
+    protected abstract void addBatch(RowData in) throws Exception;
+
+    @Override
+    public void close() throws Exception {
+        this.running = false;
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+        checkError();
+        // We must store the exception caught in the flushing so that the
+        // task thread can be aware of the failure.
+        try {
+            flush();
+        } catch (IOException e) {
+            this.flushThrowable = e;
+        }
+        checkError();
+    }

Review Comment:
   lost metric snapshot call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang merged PR #7059:
URL: https://github.com/apache/inlong/pull/7059


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on pull request #7059: [INLONG-7058][Sort] Support Apache kudu connector

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on PR #7059:
URL: https://github.com/apache/inlong/pull/7059#issuecomment-1364826308

   Please add some describe for your design and implements, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1092741370


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java:
##########
@@ -145,7 +149,11 @@ public void close() throws Exception {
     }
 
     @Override
-    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        if (sourceMetricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData,

Review Comment:
   @gong fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058097244


##########
inlong-sort/sort-connectors/kudu/pom.xml:
##########
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-kudu</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-kudu</name>
+
+    <properties />
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+            <version>${kudu.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-common</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.hudi:*</include>
+                                    <include>org.apache.hive:hive-exec</include>
+                                    <include>org.apache.hadoop:*</include>
+                                    <include>com.fasterxml.woodstox:*</include>
+                                    <include>org.codehaus.woodstox:*</include>
+                                    <include>com.google.guava:*</include>

Review Comment:
   hudi? I can't find kudu



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on PR #7059:
URL: https://github.com/apache/inlong/pull/7059#issuecomment-1400449938

   > add fileSet for kudu connector in inlong-distribution/src/main/assemblies/sort-connectors.xml
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1089868944


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.kudu.common.KuduTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.MAX_BUFFER_SIZE;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.MAX_RETRIES;
+
+/**
+ * The base for all kudu sinks.
+ */
+@PublicEvolving
+public abstract class AbstractKuduSinkFunction
+        extends
+            RichSinkFunction<RowData>
+        implements
+            CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduSinkFunction.class);
+
+    protected final KuduTableInfo kuduTableInfo;
+
+    /**
+     * The configuration of kudu sinkFunction.
+     */
+    protected final Configuration configuration;
+
+    /**
+     * The maximum number of buffered records.
+     */
+    protected final int maxBufferSize;
+
+    /**
+     * The maximum number of retries.
+     */
+    protected final int maxRetries;
+
+    /**
+     * True if the sink is running.
+     */
+    protected volatile boolean running;
+
+    /**
+     * The exception thrown in asynchronous tasks.
+     */
+    private transient Throwable flushThrowable;
+
+    private SinkMetricData sinkMetricData;
+
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+
+    private final String auditHostAndPorts;
+
+    private final String inLongMetric;
+
+    private SourceMetricData sourceMetricData;
+
+    public AbstractKuduSinkFunction(
+            KuduTableInfo kuduTableInfo,
+            Configuration configuration,
+            String inLongMetric,
+            String auditHostAndPorts) {
+        this.kuduTableInfo = kuduTableInfo;
+        this.configuration = configuration;
+        this.maxRetries = configuration.getInteger(MAX_RETRIES);
+        this.maxBufferSize = configuration.getInteger(MAX_BUFFER_SIZE);
+        this.inLongMetric = inLongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        this.running = true;
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inLongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
+    }
+
+    @Override
+    public void invoke(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache kudu connector

Posted by GitBox <gi...@apache.org>.
dockerzhang commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1057033171


##########
inlong-sort/pom.xml:
##########
@@ -47,6 +47,7 @@
         <hbase.version>2.2.3</hbase.version>
         <iceberg.hive.version>2.3.7</iceberg.hive.version>
         <hudi.hive.version>2.3.7</hudi.hive.version>
+        <kudu.version>1.16.0</kudu.version>

Review Comment:
   Please move the defined version to root pom.xml



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang closed pull request #7059: [INLONG-7058][Sort] Support Apache kudu connector

Posted by GitBox <gi...@apache.org>.
featzhang closed pull request #7059: [INLONG-7058][Sort] Support Apache kudu connector
URL: https://github.com/apache/inlong/pull/7059


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058096505


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableSink.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.inlong.sort.kudu.common.KuduOptions;
+import org.apache.inlong.sort.kudu.sink.KuduAsyncSinkFunction;
+import org.apache.inlong.sort.kudu.sink.KuduSinkFunction;
+import org.apache.kudu.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.FLUSH_MODE;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.KUDU_IGNORE_ALL_CHANGELOG;
+
+/**
+ * The KuduLookupFunction is a standard user-defined table function, it can be
+ * used in tableAPI and also useful for temporal table join plan in SQL.
+ */
+public class KuduDynamicTableSink implements DynamicTableSink {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduDynamicTableSink.class);
+
+    /**
+     * The schema of the table.
+     */
+    private final TableSchema flinkSchema;
+
+    /**
+     * The masters of kudu server.
+     */
+    private final String masters;
+
+    /**
+     * The flush mode of kudu client. <br/>
+     * AUTO_FLUSH_BACKGROUND: calls will return immediately, but the writes will be sent in the background,
+     * potentially batched together with other writes from the same session. <br/>
+     * AUTO_FLUSH_SYNC: call will return only after being flushed to the server automatically. <br/>
+     * MANUAL_FLUSH: calls will return immediately, but the writes will not be sent
+     * until the user calls <code>KuduSession.flush()</code>.
+     */
+    private final SessionConfiguration.FlushMode flushMode;
+
+    /**
+     * The name of kudu table.
+     */
+    private final String tableName;
+
+    /**
+     * The configuration for the kudu sink.
+     */
+    private final Configuration configuration;
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+
+    /**
+     * True if the data stream consumed by this sink is append-only.
+     */
+    private boolean isAppendOnly;
+
+    /**
+     * The names of the key fields of the upsert stream consumed by this sink.
+     */
+    @Nullable
+    private String[] keyFieldNames;
+    private ResolvedCatalogTable catalogTable;
+
+    public KuduDynamicTableSink(
+            ResolvedCatalogTable catalogTable,
+            TableSchema flinkSchema,
+            String masters,
+            String tableName,
+            Configuration configuration,
+            String inlongMetric,
+            String auditHostAndPorts) {
+        this.catalogTable = catalogTable;
+        this.flinkSchema = checkNotNull(flinkSchema,
+                "The schema must not be null.");
+        DataType dataType = flinkSchema.toRowDataType();
+        LogicalType logicalType = dataType.getLogicalType();
+
+        SessionConfiguration.FlushMode flushMode = SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
+        if (configuration.containsKey(FLUSH_MODE.key())) {
+            String flushModeConfig = configuration.getString(FLUSH_MODE);
+            flushMode = SessionConfiguration.FlushMode.valueOf(flushModeConfig);
+            checkNotNull(flushMode, "The flush mode must be one of " +
+                    "AUTO_FLUSH_SYNC AUTO_FLUSH_BACKGROUND or MANUAL_FLUSH.");
+        }
+
+        this.masters = masters;
+        this.flushMode = flushMode;
+        this.tableName = checkNotNull(tableName);
+        this.configuration = configuration;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+
+        String userKeyFieldsConfig = configuration.getString(KuduOptions.SINK_KEY_FIELD_NAMES);
+        if (userKeyFieldsConfig != null) {
+            userKeyFieldsConfig = userKeyFieldsConfig.trim();
+            if (!userKeyFieldsConfig.isEmpty()) {
+                this.keyFieldNames = userKeyFieldsConfig.split("\\s*,\\s*");
+            }
+        }
+    }
+
+    public DataStreamSink<?> consumeStream(DataStream<RowData> dataStream) {
+
+        SinkFunction<RowData> kuduSinkFunction = createSinkFunction();
+        if (configuration.getBoolean(KuduOptions.SINK_START_NEW_CHAIN)) {
+            dataStream = ((SingleOutputStreamOperator) dataStream).startNewChain();
+        }
+
+        return dataStream
+                .addSink(kuduSinkFunction)
+                .setParallelism(dataStream.getParallelism())
+                .name(TableConnectorUtils.generateRuntimeName(this.getClass(), flinkSchema.getFieldNames()));
+    }
+
+    private SinkFunction<RowData> createSinkFunction() {
+        boolean sinkWithAsyncMode = configuration.getBoolean(KuduOptions.SINK_WRITE_WITH_ASYNC_MODE);
+        if (sinkWithAsyncMode) {
+            return new KuduAsyncSinkFunction(
+                    flinkSchema,
+                    masters,
+                    tableName,
+                    flushMode,
+                    keyFieldNames,
+                    configuration,
+                    inlongMetric,
+                    auditHostAndPorts);
+        } else {
+            return new KuduSinkFunction(
+                    flinkSchema,
+                    masters,
+                    tableName,
+                    flushMode,
+                    configuration,
+                    inlongMetric,
+                    auditHostAndPorts);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KuduDynamicTableSink that = (KuduDynamicTableSink) o;
+        return flinkSchema.equals(that.flinkSchema) &&
+                masters.equals(that.masters) &&
+                flushMode == that.flushMode &&
+                tableName.equals(that.tableName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(flinkSchema, masters, flushMode, tableName);
+    }
+
+    /**
+     * Compare key fields given by flink planner and key fields specified by user.
+     *
+     * @param plannerKeyFields Key fields given by flink planner.
+     * @param userKeyFields    Key fields specified by user via {@link KuduOptions#SINK_KEY_FIELD_NAMES}.
+     */
+    private void compareKeyFields(String[] plannerKeyFields, String[] userKeyFields) {
+        if (plannerKeyFields == null || plannerKeyFields.length == 0) {
+            return;
+        }
+        if (userKeyFields == null || userKeyFields.length == 0) {
+            return;
+        }
+
+        Set<String> assumedSet = new HashSet<>(Arrays.asList(plannerKeyFields));
+        Set<String> userSet = new HashSet<>(Arrays.asList(userKeyFields));
+
+        if (!assumedSet.equals(userSet)) {
+            String errorMsg = String.format(
+                    "Key fields provided by flink [%s] are not the same as key fields " +
+                            "provided by user [%s]. Please adjust your key fields settings, or " +
+                            "set %s to false.",
+                    assumedSet, userSet, KuduOptions.ENABLE_KEY_FIELD_CHECK.key());
+            throw new ValidationException(errorMsg);
+        }
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        if (org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
+                .get(KUDU_IGNORE_ALL_CHANGELOG)) {
+            LOG.warn("Kudu sink receive all changelog record. "
+                    + "Regard any other record as insert-only record.");
+            return ChangelogMode.all();
+        }
+        return ChangelogMode.all();

Review Comment:
   maybe just `return ChangelogMode.all();`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058095936


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduOptions.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * The configuration options for kudu sink.
+ */
+public class KuduOptions {
+
+    public static final ConfigOption<String> FLUSH_MODE =
+            ConfigOptions.key("flush-mode")
+                    .stringType()
+                    .defaultValue("AUTO_FLUSH_SYNC")
+                    .withDescription(
+                            "The flush mode of Kudu client session, AUTO_FLUSH_SYNC/AUTO_FLUSH_BACKGROUND/MANUAL_FLUSH.");
+
+    public static final ConfigOption<Integer> MAX_CACHE_SIZE =
+            ConfigOptions.key("lookup.max-cache-size")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription("The maximum number of results cached in the " +
+                            "lookup source.");
+
+    public static final ConfigOption<String> MAX_CACHE_TIME =
+            ConfigOptions.key("lookup.max-cache-time")
+                    .stringType()
+                    .defaultValue("60s")
+                    .withDescription("The maximum live time for cached results in " +
+                            "the lookup source.");
+    public static final ConfigOption<Boolean> SINK_START_NEW_CHAIN =
+            ConfigOptions.key("sink.start-new-chain")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("The sink operator will start a new chain if true.");
+
+    public static final ConfigOption<Integer> MAX_RETRIES =
+            ConfigOptions.key("max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("The maximum number of retries when an " +
+                            "exception is caught.");
+    public static final ConfigOption<Integer> MAX_BUFFER_SIZE =
+            ConfigOptions.key("sink.max-buffer-size")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription("The maximum number of records buffered in the sink.");
+
+    public static final ConfigOption<Integer> WRITE_THREAD_COUNT =
+            ConfigOptions.key("sink.write-thread-count")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription("The maximum number of thread in the sink.");
+
+    public static final ConfigOption<String> MAX_BUFFER_TIME =
+            ConfigOptions.key("sink.max-buffer-time")
+                    .stringType()
+                    .defaultValue("30s")
+                    .withDescription("The maximum wait time for buffered records in the sink.");
+
+    public static final ConfigOption<String> SINK_KEY_FIELD_NAMES =
+            ConfigOptions.key("sink.key-field-names")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription("The key fields for updating DB when using upsert sink function.");
+    public static final ConfigOption<Boolean> ENABLE_KEY_FIELD_CHECK =
+            ConfigOptions.key("sink.enable-key-field-check")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("If true, the check to compare key fields assumed by flink " +
+                            "and key fields provided by user will be performed.");
+
+    public static final ConfigOption<Boolean> SINK_WRITE_WITH_ASYNC_MODE =
+            ConfigOptions.key("sink.write-with-async-mode")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Use async write mode for kudu producer if true.");
+
+    public static final ConfigOption<Boolean> SINK_FORCE_WITH_UPSERT_MODE =
+            ConfigOptions.key("sink.write-with-upsert-mode")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Force write kudu with upsert mode if true.");
+
+    public static final ConfigOption<Integer> CACHE_QUEUE_MAX_LENGTH =
+            ConfigOptions.key("sink.cache-queue-max-length")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription("The maximum queue lengths.");
+    public static final ConfigOption<String> CONNECTOR_TABLE =
+            ConfigOptions.key("connector.table")
+                    .stringType()
+                    .noDefaultValue().withDescription("The name of kudu table.");
+    public static final ConfigOption<String> CONNECTOR_MASTER =
+            ConfigOptions.key("connector.master")
+                    .stringType()
+                    .noDefaultValue().withDescription(" The masters of kudu server.");
+    public static final ConfigOption<Boolean> LAZY_LOAD_SCHEMA =
+            ConfigOptions.key("lazy.load.schema")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether get metadata from kudu after generating flink job graphic.");
+    public static final ConfigOption<Boolean> KUDU_IGNORE_ALL_CHANGELOG =
+            ConfigOptions.key("ignore.all.changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether ignore delete/update_before/update_after message.");

Review Comment:
   maybe `ignore.all.changelog` change to `sink.ignore.changelog`. keep consistent with other connector



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] zhangyifan27 commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
zhangyifan27 commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058136032


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/KuduAsyncSinkFunction.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.kudu.common.KuduTableInfo;
+import org.apache.inlong.sort.kudu.source.KuduConsumerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.TimeUtils.parseDuration;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.CACHE_QUEUE_MAX_LENGTH;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.MAX_BUFFER_TIME;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.SINK_FORCE_WITH_UPSERT_MODE;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.WRITE_THREAD_COUNT;
+
+/**
+ * The Flink kudu Producer in async Mode.
+ */
+@PublicEvolving
+public class KuduAsyncSinkFunction
+        extends
+            AbstractKuduSinkFunction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduAsyncSinkFunction.class);
+    private final Duration maxBufferTime;
+
+    private transient BlockingQueue<RowData> queue;
+
+    private transient List<KuduConsumerTask> consumerTasks;
+    private ExecutorService threadPool = null;
+
+    public KuduAsyncSinkFunction(
+            KuduTableInfo kuduTableInfo,
+            Configuration configuration,
+            String inlongMetric,
+            String auditHostAndPorts) {
+        super(kuduTableInfo, configuration, inlongMetric, auditHostAndPorts);
+        this.maxBufferTime = parseDuration(configuration.getString(MAX_BUFFER_TIME));
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        int threadCnt = configuration.getInteger(WRITE_THREAD_COUNT);
+        int cacheQueueMaxLength = configuration.getInteger(CACHE_QUEUE_MAX_LENGTH);
+        if (cacheQueueMaxLength == -1) {
+            cacheQueueMaxLength = (maxBufferSize + 1) * (threadCnt + 1);
+        }
+        LOG.info("Opening KuduAsyncSinkFunction, threadCount:{}, cacheQueueMaxLength:{}, maxBufferSize:{}.",
+                threadCnt, cacheQueueMaxLength, maxBufferSize);
+
+        queue = new LinkedBlockingQueue<>(cacheQueueMaxLength);
+        consumerTasks = new ArrayList<>(threadCnt);
+        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("kudu-sink-pool-%d").build();
+        threadPool = new ThreadPoolExecutor(
+                threadCnt + 1,
+                threadCnt + 1,
+                0L,
+                TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(4),
+                namedThreadFactory,
+                new ThreadPoolExecutor.AbortPolicy());
+
+        boolean forceInUpsertMode = configuration.getBoolean(SINK_FORCE_WITH_UPSERT_MODE);
+        for (int threadIndex = 0; threadIndex < threadCnt; threadIndex++) {
+            KuduWriter kuduWriter = new KuduWriter(kuduTableInfo);
+            kuduWriter.open();

Review Comment:
   Maybe multiple threads should share a `kuduTable` object to avoid multiple openTable calls for a given table in the same task.



##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/source/KuduLookupFunction.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.source;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.kudu.common.KuduOptions;
+import org.apache.inlong.sort.kudu.common.KuduTableInfo;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.TimeUtils.parseDuration;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.EQUAL;
+
+/**
+ * The KuduLookupFunction is a standard user-defined table function, it can be
+ * used in tableAPI and also useful for temporal table join plan in SQL.
+ */
+public class KuduLookupFunction extends TableFunction<Row> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(KuduLookupFunction.class);
+
+    /**
+     * The names of lookup key.
+     */
+    private final String[] keyNames;
+
+    /**
+     * The configuration for the tde source.
+     */
+    private final Configuration configuration;
+
+    /**
+     * The masters of kudu server.
+     */
+    private final String masters;
+
+    /**
+     * The name of kudu table.
+     */
+    private final String tableName;
+
+    /**
+     * The maximum number of retries.
+     */
+    private transient int maxRetries;
+
+    /**
+     * The cache for lookup results.
+     */
+    private transient Cache<Row, List<Row>> cache;
+
+    /**
+     * The client of kudu.
+     */
+    private transient KuduClient client;
+
+    /**
+     * The table of kudu.
+     */
+    private transient KuduTable table;
+
+    public KuduLookupFunction(
+            KuduTableInfo kuduTableInfo,
+            Configuration configuration) {
+        checkNotNull(configuration,
+                "The configuration must not be null.");
+
+        this.masters = kuduTableInfo.getMasters();
+        this.tableName = kuduTableInfo.getTableName();
+        this.keyNames = kuduTableInfo.getFieldNames();
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        maxRetries = configuration.getInteger(KuduOptions.MAX_RETRIES);
+        int maxCacheSize = configuration.getInteger(KuduOptions.MAX_CACHE_SIZE);
+        Duration maxCacheTime = parseDuration(configuration.getString(
+                KuduOptions.MAX_CACHE_TIME));
+        LOG.info("opening KuduLookupFunction, maxCacheSize:{}, maxCacheTime:{}.", maxCacheSize, maxCacheTime);
+
+        if (maxCacheSize > 0) {
+            cache =
+                    CacheBuilder.newBuilder()
+                            .maximumSize(maxCacheSize)
+                            .expireAfterWrite(maxCacheTime.toMillis(), TimeUnit.MILLISECONDS)
+                            .build();
+        }
+
+        this.client = new KuduClient.KuduClientBuilder(masters).build();
+
+        this.table = client.openTable(tableName);
+        LOG.info("KuduLookupFunction opened.");
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(configuration, masters, tableName);
+        result = 31 * result + Arrays.hashCode(keyNames);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KuduLookupFunction that = (KuduLookupFunction) o;
+        return Arrays.equals(keyNames, that.keyNames) && configuration.equals(that.configuration)
+                && masters.equals(that.masters) && tableName.equals(that.tableName);
+    }
+
+    private KuduPredicate predicateComparator(ColumnSchema column, Object value) {
+
+        KuduPredicate.ComparisonOp comparison = EQUAL;
+
+        KuduPredicate predicate;
+
+        switch (column.getType()) {
+            case STRING:
+                String data;
+                data = (String) value;
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, data);
+                break;
+            case FLOAT:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (float) value);
+                break;
+            case INT8:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte) value);
+                break;
+            case INT16:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (short) value);
+                break;
+            case INT32:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (int) value);
+                break;
+            case INT64:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (long) value);
+                break;
+            case DOUBLE:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (double) value);
+                break;
+            case BOOL:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (boolean) value);
+                break;
+            case UNIXTIME_MICROS:
+                Long time = (Long) value;
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, time * 1000);
+                break;
+            case BINARY:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte[]) value);
+                break;
+            default:
+                throw new IllegalArgumentException("Illegal var type: " + column.getType());
+        }
+        return predicate;
+    }
+
+    public void eval(Object... keys) throws Exception {
+        if (keys.length != keyNames.length) {
+            throw new RuntimeException("The length of lookUpKey and lookUpKeyVals is difference!");
+        }
+        Row keyRow = buildCacheKey(keys);
+        if (this.cache != null) {
+            ConcurrentMap<Row, List<Row>> cacheMap = this.cache.asMap();
+            int keyCount = cacheMap.size();
+            List<Row> cacheRows = this.cache.getIfPresent(keyRow);
+            if (CollectionUtils.isNotEmpty(cacheRows)) {
+                for (Row cacheRow : cacheRows) {
+                    collect(cacheRow);
+                }
+                return;
+            }
+        }
+
+        for (int retry = 1; retry <= maxRetries; retry++) {
+            try {
+                final KuduScanToken.KuduScanTokenBuilder scanTokenBuilder = client.newScanTokenBuilder(table);
+                final Schema kuduTableSchema = table.getSchema();
+                for (int i = 0; i < keyNames.length; i++) {
+                    String keyName = keyNames[i];
+                    Object value = keys[i];
+                    final ColumnSchema column = kuduTableSchema.getColumn(keyName);
+                    KuduPredicate predicate = predicateComparator(column, value);
+                    scanTokenBuilder.addPredicate(predicate);
+                }
+                final List<KuduScanToken> tokenList = scanTokenBuilder.build();
+                ArrayList<Row> rows = new ArrayList<>();
+                for (final KuduScanToken token : tokenList) {
+                    final List<LocatedTablet.Replica> replicas = token.getTablet().getReplicas();
+                    final String[] array = replicas
+                            .stream()
+                            .map(replica -> replica.getRpcHost() + ":" + replica.getRpcPort())
+                            .collect(Collectors.toList()).toArray(new String[replicas.size()]);
+                    final byte[] scanToken = token.serialize();
+                    final KuduScanner scanner = KuduScanToken.deserializeIntoScanner(scanToken, client);
+                    RowResultIterator rowIterator = scanner.nextRows();

Review Comment:
   We can use the `KuduScannerIterator` API to process the RowResults through `scanner.iterator()`, which contains keep alive calls to ensure the scanner does not time out. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
gong commented on PR #7059:
URL: https://github.com/apache/inlong/pull/7059#issuecomment-1366413893

   add fileSet for kudu connector in inlong-distribution/src/main/assemblies/sort-connectors.xml


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache kudu connector

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1057034209


##########
inlong-sort/sort-connectors/kudu/pom.xml:
##########
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-kudu</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-kudu</name>
+
+    <properties />
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+            <version>${kudu.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-common</artifactId>
+            <version>1.5.0-SNAPSHOT</version>

Review Comment:
   Please replace it use the variable 'project.vaersion' 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
featzhang commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1082637924


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduUtils.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.inlong.sort.formats.common.BooleanTypeInfo;
+import org.apache.inlong.sort.formats.common.ByteTypeInfo;
+import org.apache.inlong.sort.formats.common.DateTypeInfo;
+import org.apache.inlong.sort.formats.common.DecimalTypeInfo;
+import org.apache.inlong.sort.formats.common.DoubleTypeInfo;
+import org.apache.inlong.sort.formats.common.FloatTypeInfo;
+import org.apache.inlong.sort.formats.common.IntTypeInfo;
+import org.apache.inlong.sort.formats.common.LongTypeInfo;
+import org.apache.inlong.sort.formats.common.RowTypeInfo;
+import org.apache.inlong.sort.formats.common.StringTypeInfo;
+import org.apache.inlong.sort.formats.common.TimestampTypeInfo;
+import org.apache.inlong.sort.formats.common.TypeInfo;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.kudu.Type.*;
+
+/**
+ * The utility class for Kudu.
+ */
+public class KuduUtils {
+
+    private static final Map<Type, TypeInfo> KUDU_TYPE_2_TYPE_INFO_MAP = new HashMap<>(13);
+    private static final Map<Type, TypeInfo> KUDU_TYPE_2_DATA_TYPE_MAP = new HashMap<>(13);

Review Comment:
   I remove default value, 16 is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang closed pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang closed pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
URL: https://github.com/apache/inlong/pull/7059


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
featzhang commented on PR #7059:
URL: https://github.com/apache/inlong/pull/7059#issuecomment-1364933063

   > Please add the access protocol for it(add KuduLoadNode in the sort-common).
   
   I will submit another PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1092727497


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java:
##########
@@ -145,7 +149,11 @@ public void close() throws Exception {
     }
 
     @Override
-    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        if (sourceMetricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData,

Review Comment:
   here should be sinkMetricData for kudu sink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "yunqingmoswu (via GitHub)" <gi...@apache.org>.
yunqingmoswu commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1089859768


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduOptions.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * The configuration options for kudu sink.
+ */
+public class KuduOptions {
+
+    public static final ConfigOption<String> CONNECTOR_TABLE =
+            ConfigOptions.key("table")
+                    .stringType()
+                    .noDefaultValue().withDescription("The name of kudu table.");
+
+    public static final ConfigOption<String> CONNECTOR_MASTERS =

Review Comment:
   It is necessary for adding a prefix with 'CONNECTOR'?



##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduOptions.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * The configuration options for kudu sink.
+ */
+public class KuduOptions {
+
+    public static final ConfigOption<String> CONNECTOR_TABLE =

Review Comment:
   It is necessary for adding a prefix with 'CONNECTOR'?



##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.kudu.common.KuduTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.MAX_BUFFER_SIZE;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.MAX_RETRIES;
+
+/**
+ * The base for all kudu sinks.
+ */
+@PublicEvolving
+public abstract class AbstractKuduSinkFunction
+        extends
+            RichSinkFunction<RowData>
+        implements
+            CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduSinkFunction.class);
+
+    protected final KuduTableInfo kuduTableInfo;
+
+    /**
+     * The configuration of kudu sinkFunction.
+     */
+    protected final Configuration configuration;
+
+    /**
+     * The maximum number of buffered records.
+     */
+    protected final int maxBufferSize;
+
+    /**
+     * The maximum number of retries.
+     */
+    protected final int maxRetries;
+
+    /**
+     * True if the sink is running.
+     */
+    protected volatile boolean running;
+
+    /**
+     * The exception thrown in asynchronous tasks.
+     */
+    private transient Throwable flushThrowable;
+
+    private SinkMetricData sinkMetricData;
+
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+
+    private final String auditHostAndPorts;
+
+    private final String inLongMetric;
+
+    private SourceMetricData sourceMetricData;
+
+    public AbstractKuduSinkFunction(
+            KuduTableInfo kuduTableInfo,
+            Configuration configuration,
+            String inLongMetric,
+            String auditHostAndPorts) {
+        this.kuduTableInfo = kuduTableInfo;
+        this.configuration = configuration;
+        this.maxRetries = configuration.getInteger(MAX_RETRIES);
+        this.maxBufferSize = configuration.getInteger(MAX_BUFFER_SIZE);
+        this.inLongMetric = inLongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        this.running = true;
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inLongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
+    }
+
+    @Override
+    public void invoke(

Review Comment:
   It is recommended to remove the newline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1089868921


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduOptions.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * The configuration options for kudu sink.
+ */
+public class KuduOptions {
+
+    public static final ConfigOption<String> CONNECTOR_TABLE =
+            ConfigOptions.key("table")
+                    .stringType()
+                    .noDefaultValue().withDescription("The name of kudu table.");
+
+    public static final ConfigOption<String> CONNECTOR_MASTERS =

Review Comment:
   There is no prefix 'connector' in other connectors of InLong, so I removed the prefix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
featzhang commented on PR #7059:
URL: https://github.com/apache/inlong/pull/7059#issuecomment-1365047849

   > Please add some describe for your design and implements, thanks.
   
   I have supplemented some document information in the pr description, please check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058097548


##########
inlong-sort/sort-connectors/kudu/pom.xml:
##########
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-kudu</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-kudu</name>
+
+    <properties />
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review Comment:
   I suggest add a shade for `sort-connector-base`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058092502


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduUtils.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.inlong.sort.formats.common.BooleanTypeInfo;
+import org.apache.inlong.sort.formats.common.ByteTypeInfo;
+import org.apache.inlong.sort.formats.common.DateTypeInfo;
+import org.apache.inlong.sort.formats.common.DecimalTypeInfo;
+import org.apache.inlong.sort.formats.common.DoubleTypeInfo;
+import org.apache.inlong.sort.formats.common.FloatTypeInfo;
+import org.apache.inlong.sort.formats.common.IntTypeInfo;
+import org.apache.inlong.sort.formats.common.LongTypeInfo;
+import org.apache.inlong.sort.formats.common.RowTypeInfo;
+import org.apache.inlong.sort.formats.common.StringTypeInfo;
+import org.apache.inlong.sort.formats.common.TimestampTypeInfo;
+import org.apache.inlong.sort.formats.common.TypeInfo;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.kudu.Type.*;
+
+/**
+ * The utility class for Kudu.
+ */
+public class KuduUtils {
+
+    private static final Map<Type, TypeInfo> KUDU_TYPE_2_TYPE_INFO_MAP = new HashMap<>(13);
+    private static final Map<Type, TypeInfo> KUDU_TYPE_2_DATA_TYPE_MAP = new HashMap<>(13);

Review Comment:
   maybe 13 should modify to 13/0.75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang closed pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang closed pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector
URL: https://github.com/apache/inlong/pull/7059


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on PR #7059:
URL: https://github.com/apache/inlong/pull/7059#issuecomment-1403758806

   > add fileSet for kudu connector in inlong-distribution/src/main/assemblies/sort-connectors.xml
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7059: [INLONG-7058][Sort] Support Apache Kudu connector

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1089868868


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/common/KuduOptions.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * The configuration options for kudu sink.
+ */
+public class KuduOptions {
+
+    public static final ConfigOption<String> CONNECTOR_TABLE =

Review Comment:
   There is no prefix 'connector' in other connectors of InLong, so I removed the prefix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org