You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/29 15:53:42 UTC

[GitHub] [flink-connector-jdbc] eskabetxe opened a new pull request, #2: [FLINK-28284] Add JdbcSink with new format

eskabetxe opened a new pull request, #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2

   ## What is the purpose of the change
   
   Add a JdbcSink with new format (sink2)
   
   ## Brief change log
   
     - *JdbcSink* the new sink
     - *JdbcSinkWriter* the writer used by the new sink
     - *JdbcQueryStatement* the query and preparestatement that will be used
     - *SimpleJdbcQueryStatement* a simple implementation of JdbcQueryStatement
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - JdbcSinktTest.testInsertWithSinkTo
   - JdbcSinktTest.testObjectReuseWithSinkTo
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): ( no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: ( no )
     - The runtime per-record code paths (performance sensitive): (no 
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes )
     - If yes, how is the feature documented? ( JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1923662905

   @snuyanzin @MartijnVisser 
   I already change an test to mock Sink.InitContext as in 1.19 add a method with a new class..
   But now is falling all dialects for TIMESTAMP_LTZ that in 1.19 dont fail and for 1.18 fail.. is this a new feature on 1.19 correct? how should proceed with this? remove the fail test? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "wanglijie95 (via GitHub)" <gi...@apache.org>.
wanglijie95 commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1958712785

   @eskabetxe FYI, the failed tests should have been fixed in FLINK-34358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1452321599


##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/sink/writer/AlLeastOnceJdbcWriterTest.java:
##########
@@ -0,0 +1,88 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** */

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475815795


##########
.github/workflows/weekly.yml:
##########
@@ -27,25 +27,13 @@ jobs:
     strategy:
       matrix:
         flink_branches: [{
-          flink: 1.16-SNAPSHOT,
-          branch: main
-        }, {
-          flink: 1.17-SNAPSHOT,
-          branch: main
-        }, {
           flink: 1.18-SNAPSHOT,
           jdk: '8, 11, 17',
           branch: main
         }, {
           flink: 1.19-SNAPSHOT,
           jdk: '8, 11, 17, 21',
           branch: main
-        }, {
-          flink: 1.16.2,
-          branch: v3.1
-        }, {
-          flink: 1.17.1,
-          branch: v3.1
         }, {

Review Comment:
   IIRC idealy we should support currently supported Flink versions (1.17, 1.18, 1.19 and 1.16 (probably will be not supported after 1.19 release)).
   I don't know another way to have all of them supported with out branching (4.0.0 or 3.2.0 doesn't really matter)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444114157


##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/sink/writer/ExactlyOnceJdbcWriterTest.java:
##########
@@ -0,0 +1,105 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** */

Review Comment:
   I guess need to fill it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "RocMarshal (via GitHub)" <gi...@apache.org>.
RocMarshal commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1442655533


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.statements;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record.

Review Comment:
   How about 
   
   ```
    * Sets {@link PreparedStatement} parameters that's used in JDBC Sink based on a specified type of the record.
   ```
   
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475775129


##########
.github/workflows/weekly.yml:
##########
@@ -27,25 +27,13 @@ jobs:
     strategy:
       matrix:
         flink_branches: [{
-          flink: 1.16-SNAPSHOT,
-          branch: main
-        }, {
-          flink: 1.17-SNAPSHOT,
-          branch: main
-        }, {
           flink: 1.18-SNAPSHOT,
           jdk: '8, 11, 17',
           branch: main
         }, {
           flink: 1.19-SNAPSHOT,
           jdk: '8, 11, 17, 21',
           branch: main
-        }, {
-          flink: 1.16.2,
-          branch: v3.1
-        }, {
-          flink: 1.17.1,
-          branch: v3.1
         }, {

Review Comment:
   if we are bumping to 4.0.0, I agree.
   but any incompatible change between flink versions (not need break change, could be a new method we need to use as in this case) will cause that we made a major release??



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/xa/PoolingXaConnectionProvider.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.connections.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.TransientXaException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.sql.XADataSource;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A "pooling" implementation of {@link XaConnectionProvider}. Some database implement XA such that
+ * one connection is limited to a single transaction. As a workaround, this implementation creates a
+ * new XA resource after each xa_start call is made (and associates it with the xid to commit
+ * later).
+ */
+@Internal
+public class PoolingXaConnectionProvider implements XaConnectionProvider {
+    private static final long serialVersionUID = 1L;
+
+    /** */

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444596937


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java:
##########
@@ -107,5 +108,9 @@ public static <T> SinkFunction<T> exactlyOnceSink(
                 exactlyOnceOptions);
     }
 
+    public static <IN> JdbcSinkBuilder<IN> newSink() {

Review Comment:
   changing it to builder.. all other methods should be deprecated after



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Add JdbcSink with new format [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1824762588

   @wanglijie95 , @snuyanzin could you check please..
   
   The sink in this moment allows non-xa (at least once) and xa (exactly once) connections
   
   I need to add documentation but we could start checking the code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1441520106


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.PoolingXaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import javax.sql.XADataSource;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder to construct {@link JdbcSink}. */
+@PublicEvolving
+public class JdbcSinkBuilder<IN> {
+
+    private JdbcExecutionOptions executionOptions;
+    private JdbcQueryStatement<IN> queryStatement;
+
+    public JdbcSinkBuilder() {
+        this.executionOptions = JdbcExecutionOptions.defaults();
+    }
+
+    public JdbcSinkBuilder<IN> withExecutionOptions(JdbcExecutionOptions executionOptions) {
+        this.executionOptions = checkNotNull(executionOptions, "executionOptions cannot be empty");
+        return this;
+    }
+
+    public JdbcSinkBuilder<IN> withQueryStatement(JdbcQueryStatement<IN> queryStatement) {
+        this.queryStatement = queryStatement;
+        return this;
+    }
+
+    public JdbcSinkBuilder<IN> withQueryStatement(
+            String query, JdbcStatementBuilder<IN> statement) {
+        this.queryStatement = new SimpleJdbcQueryStatement<>(query, statement);
+        return this;
+    }
+
+    public JdbcSink<IN> buildAtLeastOnce(JdbcConnectionOptions connectionOptions) {
+        checkNotNull(connectionOptions, "connectionOptions cannot be empty");
+
+        return buildAtLeastOnce(new SimpleJdbcConnectionProvider(connectionOptions));
+    }
+
+    public JdbcSink<IN> buildAtLeastOnce(JdbcConnectionProvider connectionProvider) {
+        checkNotNull(connectionProvider, "connectionProvider cannot be empty");
+
+        return build(
+                DeliveryGuarantee.AT_LEAST_ONCE,
+                JdbcExactlyOnceOptions.defaults(),
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty"));
+    }
+
+    public JdbcSink<IN> buildExactlyOnce(
+            JdbcExactlyOnceOptions exactlyOnceOptions,
+            SerializableSupplier<XADataSource> dataSourceSupplier) {
+
+        checkNotNull(exactlyOnceOptions, "exactlyOnceOptions cannot be empty");
+        checkNotNull(dataSourceSupplier, "dataSourceSupplier cannot be empty");
+        XaConnectionProvider connectionProvider =
+                exactlyOnceOptions.isTransactionPerConnection()
+                        ? PoolingXaConnectionProvider.from(
+                                dataSourceSupplier, exactlyOnceOptions.getTimeoutSec())
+                        : SimpleXaConnectionProvider.from(
+                                dataSourceSupplier, exactlyOnceOptions.getTimeoutSec());
+        return buildExactlyOnce(exactlyOnceOptions, connectionProvider);
+    }
+
+    public JdbcSink<IN> buildExactlyOnce(
+            JdbcExactlyOnceOptions exactlyOnceOptions, XaConnectionProvider connectionProvider) {
+
+        return build(
+                DeliveryGuarantee.EXACTLY_ONCE,
+                checkNotNull(exactlyOnceOptions, "exactlyOnceOptions cannot be empty"),
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty"));
+    }
+
+    private JdbcSink<IN> build(
+            DeliveryGuarantee deliveryGuarantee,
+            JdbcExactlyOnceOptions exactlyOnceOptions,
+            JdbcConnectionProvider connectionProvider) {
+
+        return new JdbcSink<>(
+                checkNotNull(deliveryGuarantee, "deliveryGuarantee cannot be empty"),
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty"),
+                checkNotNull(executionOptions, "executionOptions cannot be empty"),
+                checkNotNull(exactlyOnceOptions, "exactlyOnceOptions cannot be empty"),
+                checkNotNull(queryStatement, "queryStatement cannot be empty"));

Review Comment:
   done



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.statement.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.sink.statement.SimpleJdbcQueryStatement;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder to construct {@link JdbcSink}. */
+@PublicEvolving
+public class JdbcSinkBuilder<IN> {
+
+    private JdbcConnectionProvider connectionProvider;
+    private JdbcExecutionOptions executionOptions;
+    private JdbcQueryStatement<IN> queryStatement;
+
+    public JdbcSinkBuilder() {
+        executionOptions = JdbcExecutionOptions.defaults();
+    }
+
+    public JdbcSinkBuilder<IN> withExecutionOptions(JdbcExecutionOptions executionOptions) {
+        this.executionOptions = checkNotNull(executionOptions, "executionOptions cannot be empty");
+        return this;
+    }
+
+    private JdbcSinkBuilder<IN> withConnectionProvider(JdbcConnectionProvider connectionProvider) {
+        this.connectionProvider =
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty");
+        return this;
+    }
+
+    public JdbcSinkBuilder<IN> withConnectionProvider(JdbcConnectionOptions connectionOptions) {
+        return withConnectionProvider(new SimpleJdbcConnectionProvider(connectionOptions));
+    }
+
+    public JdbcSinkBuilder<IN> withQueryStatement(JdbcQueryStatement<IN> queryStatement) {
+        this.queryStatement = queryStatement;
+        return this;
+    }
+
+    public JdbcSinkBuilder<IN> withQueryStatement(
+            String query, JdbcStatementBuilder<IN> statement) {
+        this.queryStatement = new SimpleJdbcQueryStatement<>(query, statement);
+        return this;
+    }
+
+    public JdbcSink<IN> build() {
+        checkNotNull(connectionProvider, "connectionProvider cannot be empty");
+        checkNotNull(executionOptions, "executionOptions cannot be empty");
+        checkNotNull(queryStatement, "queryStatement cannot be empty");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444113533


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitable.java:
##########
@@ -0,0 +1,39 @@
+package org.apache.flink.connector.jdbc.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
+
+import javax.annotation.Nullable;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/** */

Review Comment:
   I guess need to fill it



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitableSerializer.java:
##########
@@ -0,0 +1,37 @@
+package org.apache.flink.connector.jdbc.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.xa.XidSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+
+/** */

Review Comment:
   I guess need to fill it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444113628


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitter.java:
##########
@@ -0,0 +1,62 @@
+package org.apache.flink.connector.jdbc.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/** */

Review Comment:
   I guess need to fill it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444672855


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link JdbcWriterState} serializer. */
+@Internal
+public class JdbcWriterStateSerializer implements SimpleVersionedSerializer<JdbcWriterState> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterStateSerializer.class);
+
+    @Override
+    public int getVersion() {
+        return 2;
+    }
+
+    @Override
+    public byte[] serialize(JdbcWriterState state) throws IOException {
+        final DataOutputSerializer out = new DataOutputSerializer(1);
+        out.writeInt(state.getHanging().size());
+        for (TransactionId tid : state.getHanging()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        out.writeInt(state.getPrepared().size());
+        for (TransactionId tid : state.getPrepared()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        return out.getSharedBuffer();
+    }
+
+    @Override
+    public JdbcWriterState deserialize(int version, byte[] serialized) throws IOException {
+        final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+        if (version == getVersion()) {
+            return deserializeV2(in);
+        }
+
+        LOG.error("Unknown version of state: " + version);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444576236


##########
pom.xml:
##########
@@ -47,7 +47,7 @@ under the License.
     </modules>
 
     <properties>
-        <flink.version>1.16.2</flink.version>
+        <flink.version>1.18.0</flink.version>

Review Comment:
   Yes, we need the code introduced on 1.18 by this [FLIP-287](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444112306


##########
pom.xml:
##########
@@ -47,7 +47,7 @@ under the License.
     </modules>
 
     <properties>
-        <flink.version>1.16.2</flink.version>
+        <flink.version>1.18.0</flink.version>

Review Comment:
   Are we going to drop support for flink 1.16, 1.17 together with this commit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "RocMarshal (via GitHub)" <gi...@apache.org>.
RocMarshal commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1442712519


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.statements;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record.
+ */
+@PublicEvolving
+public interface JdbcQueryStatement<T> extends Serializable {
+    String query();
+
+    void map(PreparedStatement ps, T data) throws SQLException;

Review Comment:
   - `data` -> `record`
   - The method described by the interface here is more like filling the data of a message into a preparedstatement. So how about    
        - changing method name like `JdbcStatementFiller` or `JdbcStatementRender`
        - changing the interface name to `fill` or `render`     
   ?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.statements;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record.

Review Comment:
   How about 
   
   ```
    * Sets {@link PreparedStatement} parameters that's used in JDBC Sink based on a specified type of the record.
   ```
   
   ?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,271 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   why not use `FlinkRuntimeException(e)` ?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link JdbcWriterState} serializer. */
+@Internal
+public class JdbcWriterStateSerializer implements SimpleVersionedSerializer<JdbcWriterState> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterStateSerializer.class);
+
+    @Override
+    public int getVersion() {
+        return 2;
+    }
+
+    @Override
+    public byte[] serialize(JdbcWriterState state) throws IOException {
+        final DataOutputSerializer out = new DataOutputSerializer(1);
+        out.writeInt(state.getHanging().size());
+        for (TransactionId tid : state.getHanging()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        out.writeInt(state.getPrepared().size());
+        for (TransactionId tid : state.getPrepared()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        return out.getSharedBuffer();
+    }
+
+    @Override
+    public JdbcWriterState deserialize(int version, byte[] serialized) throws IOException {
+        final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+        if (version == getVersion()) {
+            return deserializeV2(in);
+        }
+
+        LOG.error("Unknown version of state: " + version);

Review Comment:
   ```suggestion
           LOG.error("Unknown version of state: {}", version);
   ```



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** @param <IN> */

Review Comment:
   It seems that the comments here are not complete?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java:
##########
@@ -107,5 +108,9 @@ public static <T> SinkFunction<T> exactlyOnceSink(
                 exactlyOnceOptions);
     }
 
+    public static <IN> JdbcSinkBuilder<IN> newSink() {

Review Comment:
   how about 
   ```
   public static <IN> JdbcSinkBuilder<IN> newSinkBuilder() {
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin merged PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1528657911


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Jdbc writer that allow at-least-once (non-XA operation) and exactly-once (XA operation)
+ * semantics.
+ */
+@Internal
+public class JdbcWriter<IN>
+        implements StatefulSink.StatefulSinkWriter<IN, JdbcWriterState>,
+                TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, JdbcCommitable> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcWriter.class);
+
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final JdbcOutputFormat<IN, IN, JdbcBatchStatementExecutor<IN>> jdbcOutput;
+
+    private final TransactionId transactionId;

Review Comment:
   Do we need to keep this var in state?
   it looks like it is inited only in constructor and is used only to init `jdbcTransaction` in same constructor...
   I was not able any other usages



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "RocMarshal (via GitHub)" <gi...@apache.org>.
RocMarshal commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1529646574


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,273 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+@Internal
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private static byte[] readJobId(DataInputDeserializer in) throws IOException {
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        in.read(jobIdBytes);
+        return jobIdBytes;
+    }
+
+    public TransactionId copy() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withBranch(long checkpointId) {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withAttemptsIncremented() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts + 1, restored);
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public boolean getRestored() {
+        return restored;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public String getXidValue() {
+        return String.format(
+                "%s:%s:%s",
+                getFormatId(),
+                byteToHexString(getGlobalTransactionId()),
+                byteToHexString(getBranchQualifier()));
+    }
+
+    @Override
+    public int getFormatId() {
+        return FORMAT_ID;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        try {
+            // globalTransactionId = job id + task index
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());
+        }
+    }
+
+    @Override
+    public byte[] getBranchQualifier() {
+        Preconditions.checkArgument(checkpointId >= 0, "No branch was initialized");
+        try {
+            // branchQualifier = numberOfSubtasks + checkpoint id
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.writeInt(numberOfSubtasks);
+            out.writeLong(checkpointId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());
+        }
+    }
+
+    public byte[] serialize() {
+        try {
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            out.writeInt(numberOfSubtasks);
+            out.writeLong(checkpointId);
+            out.writeInt(attempts);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   ditto



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,273 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+@Internal
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private static byte[] readJobId(DataInputDeserializer in) throws IOException {
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        in.read(jobIdBytes);
+        return jobIdBytes;
+    }
+
+    public TransactionId copy() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withBranch(long checkpointId) {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withAttemptsIncremented() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts + 1, restored);
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public boolean getRestored() {
+        return restored;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public String getXidValue() {
+        return String.format(
+                "%s:%s:%s",
+                getFormatId(),
+                byteToHexString(getGlobalTransactionId()),
+                byteToHexString(getBranchQualifier()));
+    }
+
+    @Override
+    public int getFormatId() {
+        return FORMAT_ID;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        try {
+            // globalTransactionId = job id + task index
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());
+        }
+    }
+
+    @Override
+    public byte[] getBranchQualifier() {
+        Preconditions.checkArgument(checkpointId >= 0, "No branch was initialized");
+        try {
+            // branchQualifier = numberOfSubtasks + checkpoint id
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.writeInt(numberOfSubtasks);
+            out.writeLong(checkpointId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   ditto



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,273 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+@Internal
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private static byte[] readJobId(DataInputDeserializer in) throws IOException {
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        in.read(jobIdBytes);
+        return jobIdBytes;
+    }
+
+    public TransactionId copy() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withBranch(long checkpointId) {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withAttemptsIncremented() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts + 1, restored);
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public boolean getRestored() {
+        return restored;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public String getXidValue() {
+        return String.format(
+                "%s:%s:%s",
+                getFormatId(),
+                byteToHexString(getGlobalTransactionId()),
+                byteToHexString(getBranchQualifier()));
+    }
+
+    @Override
+    public int getFormatId() {
+        return FORMAT_ID;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        try {
+            // globalTransactionId = job id + task index
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   just a little puzzle.
   Why not use `throw new FlinkRuntimeException(e);` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444114075


##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/sink/writer/AlLeastOnceJdbcWriterTest.java:
##########
@@ -0,0 +1,88 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** */

Review Comment:
   I guess need to fill it



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/sink/writer/BaseJdbcWriterTest.java:
##########
@@ -0,0 +1,210 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.databases.derby.DerbyTestBase;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** */

Review Comment:
   I guess need to fill it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444608746


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,271 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1452321352


##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/sink/writer/ExactlyOnceJdbcWriterTest.java:
##########
@@ -0,0 +1,105 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** */

Review Comment:
   done



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/sink/writer/BaseJdbcWriterTest.java:
##########
@@ -0,0 +1,210 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.databases.derby.DerbyTestBase;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** */

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475771264


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** @param <IN> */

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1528634655


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JobSubtask.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink;
+
+import java.io.Serializable;
+
+/** Job identifier. */
+public class JobSubtask implements Serializable {

Review Comment:
   ```suggestion
   @Internal
   class JobSubtask implements Serializable {
   ```
   
   i guess we need to mark it internal and harden modifier especially to prevent case when someone modifies content of byte array `jobId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-jdbc] boring-cyborg[bot] commented on pull request #2: [FLINK-28284] Add JdbcSink with new format

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1330863470

   Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-jdbc] eskabetxe commented on pull request #2: [FLINK-28284] Add JdbcSink with new format

Posted by GitBox <gi...@apache.org>.
eskabetxe commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1351401591

   @MartijnVisser rebased, but this is blocked @wanglijie95 could give more feedback, but basically we are waiting for a new FLIP to expose ExecutionConfig or isObjectReuseEnabled through InitContext, allowing the use of JdbcOutputFormat in the writer..
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-jdbc] MartijnVisser commented on pull request #2: [FLINK-28284] Add JdbcSink with new format

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1351323750

   @eskabetxe Could you rebase? The architecture tests have now been added and it would be good to check that the new sink properly implements these


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-jdbc] eskabetxe commented on pull request #2: [FLINK-25421] Add JdbcSink with new format

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1692338883

   @wanglijie95 updated the code to use changes introduced on 1.18, this should be merged after [this PR](https://github.com/apache/flink-connector-jdbc/pull/73) as it depends on that changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1441516846


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java:
##########
@@ -0,0 +1,295 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.EmptyTransactionXaException;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.TransientXaException;
+import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/** */
+public class XaTransaction implements Serializable, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(XaTransaction.class);
+
+    private final XaConnectionProvider xaConnectionProvider;
+    private final JdbcExactlyOnceOptions exactlyOnceOptions;
+
+    // checkpoints and the corresponding transactions waiting for completion notification from JM
+    private transient List<TransactionId> preparedXids = new ArrayList<>();
+    // hanging XIDs - used for cleanup
+    // it's a list to support retries and scaling down
+    // possible transaction states: active, idle, prepared
+    // last element is the current xid
+    private transient Deque<TransactionId> hangingXids = new LinkedList<>();
+    private transient TransactionId currentTid;
+
+    private final TransactionId baseTransaction;
+
+    public XaTransaction(
+            JdbcExactlyOnceOptions exactlyOnceOptions,
+            TransactionId transactionId,
+            XaConnectionProvider xaFacade) {
+        this.xaConnectionProvider = xaFacade;
+        this.exactlyOnceOptions = exactlyOnceOptions;
+        this.baseTransaction = transactionId;
+    }
+
+    public Xid getCurrentXid() {
+        return currentTid;
+    }
+
+    public XaConnectionProvider getConnectionProvider() {
+        return xaConnectionProvider;
+    }
+
+    public JdbcWriterState getState() {
+        return JdbcWriterState.of(preparedXids, hangingXids);
+    }
+
+    public void open(JdbcWriterState state) throws IOException {
+        try {
+            xaConnectionProvider.open();
+            recoverState(state);
+            hangingXids = new LinkedList<>(failOrRollback(hangingXids).getForRetry());
+            commitTx();
+            if (exactlyOnceOptions.isDiscoverAndRollbackOnRecovery()) {
+                // Pending transactions which are not included into the checkpoint might hold locks
+                // and
+                // should be rolled back. However, rolling back ALL transactions can cause data
+                // loss. So
+                // each subtask first commits transactions from its state and then rolls back
+                // discovered
+                // transactions if they belong to it.
+                recoverAndRollback();
+            }
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (currentTid != null && xaConnectionProvider.isOpen()) {
+            try {
+                LOG.debug(
+                        "remove current transaction before closing, xid={}",
+                        currentTid.getXidValue());
+                xaConnectionProvider.failAndRollback(currentTid);
+            } catch (Exception e) {
+                LOG.warn(
+                        "unable to fail/rollback current transaction, xid={}",
+                        currentTid.getXidValue(),
+                        e);
+            }
+        }
+        xaConnectionProvider.close();
+        currentTid = null;
+        hangingXids = null;
+        preparedXids = null;
+    }
+
+    public void recoverState(JdbcWriterState state) {
+        hangingXids = new LinkedList<>(state.getHanging());
+        preparedXids = new ArrayList<>(state.getPrepared());
+        LOG.info(
+                "initialized state: prepared xids: {}, hanging xids: {}",
+                preparedXids.size(),
+                hangingXids.size());
+    }
+
+    public void checkState() {
+        Preconditions.checkState(currentTid != null, "current xid must not be null");
+        Preconditions.checkState(
+                !hangingXids.isEmpty() && hangingXids.peekLast().equals(currentTid),
+                "inconsistent internal state");
+    }
+
+    /** @param checkpointId to associate with the new transaction. */
+    public void createTx(long checkpointId) throws IOException {
+        try {
+            Preconditions.checkState(currentTid == null, "currentXid not null");
+            currentTid = baseTransaction.withBranch(checkpointId);
+            hangingXids.offerLast(currentTid);
+            xaConnectionProvider.start(currentTid);
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+    }
+
+    public void prepareTx() throws IOException {
+        checkState();
+        hangingXids.pollLast();
+        try {
+            xaConnectionProvider.endAndPrepare(currentTid);
+            preparedXids.add(currentTid);
+        } catch (EmptyTransactionXaException e) {
+            LOG.info(
+                    "empty XA transaction (skip), xid: {}, checkpoint {}",
+                    currentTid.getXidValue(),
+                    currentTid.getCheckpointId());
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+        currentTid = null;
+    }
+
+    public void commitTx() {
+        List<TransactionId> toCommit = preparedXids;
+        preparedXids = new ArrayList<>();
+        preparedXids.addAll(commitXids(toCommit));
+    }
+
+    public void commitTxUntil(long checkpointId) {
+        Tuple2<List<TransactionId>, List<TransactionId>> splittedXids =
+                split(preparedXids, checkpointId);
+
+        if (splittedXids.f0.isEmpty()) {
+            LOG.warn("nothing to commit up to checkpoint: {}", checkpointId);
+        } else {
+            preparedXids = splittedXids.f1;
+            preparedXids.addAll(commitXids(splittedXids.f0));
+        }
+    }
+
+    public List<TransactionId> commitXids(List<TransactionId> xids) {
+        return commit(
+                        xids,
+                        exactlyOnceOptions.isAllowOutOfOrderCommits(),
+                        exactlyOnceOptions.getMaxCommitAttempts())
+                .getForRetry();
+    }
+
+    private Tuple2<List<TransactionId>, List<TransactionId>> split(
+            List<TransactionId> list, long checkpointId) {
+        return split(list, checkpointId, true);
+    }
+
+    private Tuple2<List<TransactionId>, List<TransactionId>> split(
+            List<TransactionId> list, long checkpointId, boolean checkpointIntoLo) {
+
+        List<TransactionId> lo = new ArrayList<>(list.size() / 2);
+        List<TransactionId> hi = new ArrayList<>(list.size() / 2);
+        list.forEach(
+                i -> {
+                    if (i.getCheckpointId() < checkpointId
+                            || (i.getCheckpointId() == checkpointId && checkpointIntoLo)) {
+                        lo.add(i);
+                    } else {
+                        hi.add(i);
+                    }
+                });
+        return new Tuple2<>(lo, hi);
+    }
+
+    private XaTransactionResult<TransactionId> commit(
+            List<TransactionId> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts) {
+        XaTransactionResult<TransactionId> result = new XaTransactionResult<>();
+        int origSize = xids.size();
+        LOG.debug("commit {} transactions", origSize);
+        for (Iterator<TransactionId> i = xids.iterator();
+                i.hasNext() && (result.hasNoFailures() || allowOutOfOrderCommits); ) {
+            TransactionId x = i.next();
+            i.remove();
+            try {
+                xaConnectionProvider.commit(x, x.getRestored());
+                result.succeeded(x);
+            } catch (TransientXaException e) {
+                result.failedTransiently(x.withAttemptsIncremented(), e);
+            } catch (Exception e) {
+                result.failed(x, e);
+            }
+        }
+        result.getForRetry().addAll(xids);
+        result.throwIfAnyFailed("commit");
+        throwIfAnyReachedMaxAttempts(result, maxCommitAttempts);
+        result.getTransientFailure()
+                .ifPresent(
+                        f ->
+                                LOG.warn(
+                                        "failed to commit {} transactions out of {} (keep them to retry later)",
+                                        result.getForRetry().size(),
+                                        origSize,
+                                        f));
+        return result;
+    }
+
+    private XaTransactionResult<TransactionId> failOrRollback(Collection<TransactionId> xids) {
+        XaTransactionResult<TransactionId> result = new XaTransactionResult<>();
+        if (xids.isEmpty()) {
+            return result;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("rolling back {} transactions: {}", xids.size(), xids);
+        }
+        for (TransactionId x : xids) {
+            try {
+                xaConnectionProvider.failAndRollback(x);
+                result.succeeded(x);
+            } catch (TransientXaException e) {
+                LOG.info("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
+                result.failedTransiently(x, e);
+            } catch (Exception e) {
+                LOG.warn("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
+                result.failed(x, e);
+            }
+        }
+        if (!result.getForRetry().isEmpty()) {
+            LOG.info("failed to roll back {} transactions", result.getForRetry().size());
+        }
+        return result;
+    }
+
+    private void recoverAndRollback() {
+        Collection<Xid> recovered = xaConnectionProvider.recover();
+        if (recovered.isEmpty()) {
+            return;
+        }
+        LOG.warn("rollback {} recovered transactions", recovered.size());
+        for (Xid xid : recovered) {
+            if (baseTransaction.belongsTo(xid)) {
+                try {
+                    xaConnectionProvider.rollback(xid);
+                } catch (Exception e) {
+                    LOG.info("unable to rollback recovered transaction, xid={}", xid, e);
+                }
+            }
+        }
+    }
+
+    private void throwIfAnyReachedMaxAttempts(
+            XaTransactionResult<TransactionId> result, int maxAttempts) {
+        List<TransactionId> reached = null;
+        for (TransactionId x : result.getForRetry()) {
+            if (x.getAttempts() >= maxAttempts) {
+                if (reached == null) {
+                    reached = new ArrayList<>();
+                }
+                reached.add(x);
+            }
+        }
+        if (reached != null) {

Review Comment:
   I would need to change to checkArgument(reached == null, "error message") and this will throw a IllegalArgumentException.
   
   Its not really a bad argument, I would say that I prefer the way it is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444113024


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterState.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/** Thread-safe (assuming immutable {@link Xid} implementation). */
+@ThreadSafe
+@Internal
+public class JdbcWriterState implements Serializable {
+    private final Collection<TransactionId> prepared;
+    private final Collection<TransactionId> hanging;
+
+    public static JdbcWriterState empty() {
+        return new JdbcWriterState(Collections.emptyList(), Collections.emptyList());
+    }
+
+    public static JdbcWriterState of(
+            Collection<TransactionId> prepared, Collection<TransactionId> hanging) {
+        return new JdbcWriterState(
+                Collections.unmodifiableList(new ArrayList<>(prepared)),
+                Collections.unmodifiableList(new ArrayList<>(hanging)));
+    }
+
+    protected JdbcWriterState(
+            Collection<TransactionId> prepared, Collection<TransactionId> hanging) {
+        this.prepared = prepared;
+        this.hanging = hanging;
+    }
+
+    /**
+     * @return immutable collection of prepared XA transactions to {@link
+     *     javax.transaction.xa.XAResource#commit commit}.
+     */
+    public Collection<TransactionId> getPrepared() {
+        return prepared;
+    }
+
+    /**
+     * @return immutable collection of XA transactions to {@link
+     *     javax.transaction.xa.XAResource#rollback rollback} (if they were prepared) or {@link
+     *     javax.transaction.xa.XAResource#end end} (if they were only started).
+     */
+    public Collection<TransactionId> getHanging() {
+        return hanging;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        JdbcWriterState that = (JdbcWriterState) o;
+        return new EqualsBuilder()
+                .append(prepared, that.prepared)
+                .append(hanging, that.hanging)
+                .isEquals();

Review Comment:
   is there no way to use just `equals` ?
   It seems collections only, so equals e.g. `Objects.equals` should work...
   I'm confused by creation a new object each time `equals` is called.
   Or did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "wanglijie95 (via GitHub)" <gi...@apache.org>.
wanglijie95 commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-2016528614

   @snuyanzin @RocMarshal @eskabetxe Sorry for the late reply. I'm busy this period and have no time to review this PR in detail. 
   
   Only one minor comment: it would be nice to update the relevant documents(`jdbc.md`) so that users know how to use the new JdbcSink(based on sink v2). Of course, it can be a separate issue/pr.  
   
   Thanks for your efforts, please go ahead :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Add JdbcSink with new format [flink-connector-jdbc]

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1875367844

   @wanglijie95 Do you have bandwidth to review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1441520406


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.statement.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.sink.statement.SimpleJdbcQueryStatement;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder to construct {@link JdbcSink}. */
+@PublicEvolving
+public class JdbcSinkBuilder<IN> {
+
+    private JdbcConnectionProvider connectionProvider;
+    private JdbcExecutionOptions executionOptions;
+    private JdbcQueryStatement<IN> queryStatement;
+
+    public JdbcSinkBuilder() {
+        executionOptions = JdbcExecutionOptions.defaults();
+    }
+
+    public JdbcSinkBuilder<IN> withExecutionOptions(JdbcExecutionOptions executionOptions) {
+        this.executionOptions = checkNotNull(executionOptions, "executionOptions cannot be empty");
+        return this;
+    }
+
+    private JdbcSinkBuilder<IN> withConnectionProvider(JdbcConnectionProvider connectionProvider) {
+        this.connectionProvider =
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty");

Review Comment:
   done



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.statement.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.sink.statement.SimpleJdbcQueryStatement;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder to construct {@link JdbcSink}. */
+@PublicEvolving
+public class JdbcSinkBuilder<IN> {
+
+    private JdbcConnectionProvider connectionProvider;
+    private JdbcExecutionOptions executionOptions;
+    private JdbcQueryStatement<IN> queryStatement;
+
+    public JdbcSinkBuilder() {
+        executionOptions = JdbcExecutionOptions.defaults();
+    }
+
+    public JdbcSinkBuilder<IN> withExecutionOptions(JdbcExecutionOptions executionOptions) {
+        this.executionOptions = checkNotNull(executionOptions, "executionOptions cannot be empty");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1528627627


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunctionState.java:
##########
@@ -31,7 +31,7 @@
 /** Thread-safe (assuming immutable {@link Xid} implementation). */
 @ThreadSafe
 @Internal
-class JdbcXaSinkFunctionState {
+public class JdbcXaSinkFunctionState {

Review Comment:
   do wee need change modifier here?
   It seems with package private it works ok or did i miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1528689986


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Jdbc writer that allow at-least-once (non-XA operation) and exactly-once (XA operation)
+ * semantics.
+ */
+@Internal
+public class JdbcWriter<IN>
+        implements StatefulSink.StatefulSinkWriter<IN, JdbcWriterState>,
+                TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, JdbcCommitable> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcWriter.class);
+
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final JdbcOutputFormat<IN, IN, JdbcBatchStatementExecutor<IN>> jdbcOutput;
+
+    private final TransactionId transactionId;

Review Comment:
   transactionId correct? no its not needed, moved also the creation to inside if block of exactly_once
   
   changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1529961727


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,273 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+@Internal
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private static byte[] readJobId(DataInputDeserializer in) throws IOException {
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        in.read(jobIdBytes);
+        return jobIdBytes;
+    }
+
+    public TransactionId copy() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withBranch(long checkpointId) {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withAttemptsIncremented() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts + 1, restored);
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public boolean getRestored() {
+        return restored;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public String getXidValue() {
+        return String.format(
+                "%s:%s:%s",
+                getFormatId(),
+                byteToHexString(getGlobalTransactionId()),
+                byteToHexString(getBranchQualifier()));
+    }
+
+    @Override
+    public int getFormatId() {
+        return FORMAT_ID;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        try {
+            // globalTransactionId = job id + task index
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   changed.



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,273 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+@Internal
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private static byte[] readJobId(DataInputDeserializer in) throws IOException {
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        in.read(jobIdBytes);
+        return jobIdBytes;
+    }
+
+    public TransactionId copy() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withBranch(long checkpointId) {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withAttemptsIncremented() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts + 1, restored);
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public boolean getRestored() {
+        return restored;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public String getXidValue() {
+        return String.format(
+                "%s:%s:%s",
+                getFormatId(),
+                byteToHexString(getGlobalTransactionId()),
+                byteToHexString(getBranchQualifier()));
+    }
+
+    @Override
+    public int getFormatId() {
+        return FORMAT_ID;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        try {
+            // globalTransactionId = job id + task index
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());
+        }
+    }
+
+    @Override
+    public byte[] getBranchQualifier() {
+        Preconditions.checkArgument(checkpointId >= 0, "No branch was initialized");
+        try {
+            // branchQualifier = numberOfSubtasks + checkpoint id
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.writeInt(numberOfSubtasks);
+            out.writeLong(checkpointId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   changed.



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,273 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+@Internal
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private static byte[] readJobId(DataInputDeserializer in) throws IOException {
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        in.read(jobIdBytes);
+        return jobIdBytes;
+    }
+
+    public TransactionId copy() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withBranch(long checkpointId) {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, restored);
+    }
+
+    public TransactionId withAttemptsIncremented() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts + 1, restored);
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public boolean getRestored() {
+        return restored;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public String getXidValue() {
+        return String.format(
+                "%s:%s:%s",
+                getFormatId(),
+                byteToHexString(getGlobalTransactionId()),
+                byteToHexString(getBranchQualifier()));
+    }
+
+    @Override
+    public int getFormatId() {
+        return FORMAT_ID;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        try {
+            // globalTransactionId = job id + task index
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());
+        }
+    }
+
+    @Override
+    public byte[] getBranchQualifier() {
+        Preconditions.checkArgument(checkpointId >= 0, "No branch was initialized");
+        try {
+            // branchQualifier = numberOfSubtasks + checkpoint id
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.writeInt(numberOfSubtasks);
+            out.writeLong(checkpointId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());
+        }
+    }
+
+    public byte[] serialize() {
+        try {
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            out.writeInt(numberOfSubtasks);
+            out.writeLong(checkpointId);
+            out.writeInt(attempts);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1452316648


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitable.java:
##########
@@ -0,0 +1,39 @@
+package org.apache.flink.connector.jdbc.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
+
+import javax.annotation.Nullable;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/** */

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475725109


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/xa/PoolingXaConnectionProvider.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.connections.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.TransientXaException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.sql.XADataSource;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A "pooling" implementation of {@link XaConnectionProvider}. Some database implement XA such that
+ * one connection is limited to a single transaction. As a workaround, this implementation creates a
+ * new XA resource after each xa_start call is made (and associates it with the xid to commit
+ * later).
+ */
+@Internal
+public class PoolingXaConnectionProvider implements XaConnectionProvider {
+    private static final long serialVersionUID = 1L;
+
+    /** */

Review Comment:
   do we still need that comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475772527


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JobSubtask.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink;
+
+import java.io.Serializable;
+
+/** */

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475815795


##########
.github/workflows/weekly.yml:
##########
@@ -27,25 +27,13 @@ jobs:
     strategy:
       matrix:
         flink_branches: [{
-          flink: 1.16-SNAPSHOT,
-          branch: main
-        }, {
-          flink: 1.17-SNAPSHOT,
-          branch: main
-        }, {
           flink: 1.18-SNAPSHOT,
           jdk: '8, 11, 17',
           branch: main
         }, {
           flink: 1.19-SNAPSHOT,
           jdk: '8, 11, 17, 21',
           branch: main
-        }, {
-          flink: 1.16.2,
-          branch: v3.1
-        }, {
-          flink: 1.17.1,
-          branch: v3.1
         }, {

Review Comment:
   IIRC idealy we should support currently supported Flink versions (1.17, 1.18, 1.19 and 1.16 (probably will be not supported after 1.19 release)).
   I don't know another way to have all of them supported without branching (4.0.0 or 3.2.0 doesn't really matter)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-jdbc] wanglijie95 commented on pull request #2: [FLINK-28284] Add JdbcSink with new format

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1359578023

   @MartijnVisser @eskabetxe Yes, I think it(expose ExecutionConfig or isObjectReuseEnabled) is a blocker to the migration. The jdbc sink needs to decide whether to buffer the copies or the original records based on object-reuse. (When the object reuse is enabled, we should buffer the copies(because the content of the objects may be changed before flush), otherwise we should buffer the original record).
   
   @eskabetxe It would be great if you are willing to drive the FLIP(expose ExecutionConfig or isObjectReuseEnabled) and discussion (I'm currently too busy with other 1.17 features to do this). Otherwise, I will start this FLIP after the 1.17 code freeze, which means this change needs to be put into the next version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1876772728

   @RocMarshal thanks for the review.. the 3 commits are there to allow to review each semantic separately (I thing its easy), we could squash all in one after finishing the reviews..
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444113959


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JobSubtask.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink;
+
+import java.io.Serializable;
+
+/** */

Review Comment:
   I guess need to fill it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444686739


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterState.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/** Thread-safe (assuming immutable {@link Xid} implementation). */
+@ThreadSafe
+@Internal
+public class JdbcWriterState implements Serializable {
+    private final Collection<TransactionId> prepared;
+    private final Collection<TransactionId> hanging;
+
+    public static JdbcWriterState empty() {
+        return new JdbcWriterState(Collections.emptyList(), Collections.emptyList());
+    }
+
+    public static JdbcWriterState of(
+            Collection<TransactionId> prepared, Collection<TransactionId> hanging) {
+        return new JdbcWriterState(
+                Collections.unmodifiableList(new ArrayList<>(prepared)),
+                Collections.unmodifiableList(new ArrayList<>(hanging)));
+    }
+
+    protected JdbcWriterState(
+            Collection<TransactionId> prepared, Collection<TransactionId> hanging) {
+        this.prepared = prepared;
+        this.hanging = hanging;
+    }
+
+    /**
+     * @return immutable collection of prepared XA transactions to {@link
+     *     javax.transaction.xa.XAResource#commit commit}.
+     */
+    public Collection<TransactionId> getPrepared() {
+        return prepared;
+    }
+
+    /**
+     * @return immutable collection of XA transactions to {@link
+     *     javax.transaction.xa.XAResource#rollback rollback} (if they were prepared) or {@link
+     *     javax.transaction.xa.XAResource#end end} (if they were only started).
+     */
+    public Collection<TransactionId> getHanging() {
+        return hanging;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        JdbcWriterState that = (JdbcWriterState) o;
+        return new EqualsBuilder()
+                .append(prepared, that.prepared)
+                .append(hanging, that.hanging)
+                .isEquals();

Review Comment:
   is auto generated, with template of apache commons-lang3
   
   I could change it if the default is another way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475842140


##########
.github/workflows/weekly.yml:
##########
@@ -27,25 +27,13 @@ jobs:
     strategy:
       matrix:
         flink_branches: [{
-          flink: 1.16-SNAPSHOT,
-          branch: main
-        }, {
-          flink: 1.17-SNAPSHOT,
-          branch: main
-        }, {
           flink: 1.18-SNAPSHOT,
           jdk: '8, 11, 17',
           branch: main
         }, {
           flink: 1.19-SNAPSHOT,
           jdk: '8, 11, 17, 21',
           branch: main
-        }, {
-          flink: 1.16.2,
-          branch: v3.1
-        }, {
-          flink: 1.17.1,
-          branch: v3.1
         }, {

Review Comment:
   We could release 3.2 (3.1.X) with current main code giving support for 1.17 and 1.18 (1.18 still not support by any release), I don't know why still don't have release an 1.18 compatible version, any blocker?
   
   And them merge this and release again with (minor or major)..



##########
.github/workflows/weekly.yml:
##########
@@ -27,25 +27,13 @@ jobs:
     strategy:
       matrix:
         flink_branches: [{
-          flink: 1.16-SNAPSHOT,
-          branch: main
-        }, {
-          flink: 1.17-SNAPSHOT,
-          branch: main
-        }, {
           flink: 1.18-SNAPSHOT,
           jdk: '8, 11, 17',
           branch: main
         }, {
           flink: 1.19-SNAPSHOT,
           jdk: '8, 11, 17, 21',
           branch: main
-        }, {
-          flink: 1.16.2,
-          branch: v3.1
-        }, {
-          flink: 1.17.1,
-          branch: v3.1
         }, {

Review Comment:
   We could release 3.2 (3.1.X) with current main code giving support for 1.17 and 1.18 (1.18 still not support by any release), I don't know why still don't have release an 1.18 compatible version, any blocker?
   
   And them merge this and release again with (minor or major)..
   Would this work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1528691110


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link JdbcWriterState} serializer. */
+@Internal
+public class JdbcWriterStateSerializer implements SimpleVersionedSerializer<JdbcWriterState> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterStateSerializer.class);
+
+    @Override
+    public int getVersion() {
+        return 2;
+    }
+
+    @Override
+    public byte[] serialize(JdbcWriterState state) throws IOException {
+        final DataOutputSerializer out = new DataOutputSerializer(1);
+        out.writeInt(state.getHanging().size());
+        for (TransactionId tid : state.getHanging()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        out.writeInt(state.getPrepared().size());
+        for (TransactionId tid : state.getPrepared()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        return out.getSharedBuffer();
+    }
+
+    @Override
+    public JdbcWriterState deserialize(int version, byte[] serialized) throws IOException {
+        final DataInputDeserializer in = new DataInputDeserializer(serialized);

Review Comment:
   done



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JobSubtask.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink;
+
+import java.io.Serializable;
+
+/** Job identifier. */
+public class JobSubtask implements Serializable {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1528693061


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunctionState.java:
##########
@@ -31,7 +31,7 @@
 /** Thread-safe (assuming immutable {@link Xid} implementation). */
 @ThreadSafe
 @Internal
-class JdbcXaSinkFunctionState {
+public class JdbcXaSinkFunctionState {

Review Comment:
   it was from first attempt to reuse this class, but was difficult..
   reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-2014972105

   Thanks for taking a look @RocMarshal 
   
   if there is no objections I'm going to merge it in coming days


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-2020439861

   Thanks for the hard work @eskabetxe 
   and thanks everyone for the review
   
   @eskabetxe can you also please provide a PR to update `jdbc.md`?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-jdbc] eskabetxe commented on pull request #2: [FLINK-28284] Add JdbcSink with new format

Posted by GitBox <gi...@apache.org>.
eskabetxe commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1330865353

   @wanglijie95 re-route the JDBC PR here..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1528644716


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link JdbcWriterState} serializer. */
+@Internal
+public class JdbcWriterStateSerializer implements SimpleVersionedSerializer<JdbcWriterState> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterStateSerializer.class);
+
+    @Override
+    public int getVersion() {
+        return 2;
+    }
+
+    @Override
+    public byte[] serialize(JdbcWriterState state) throws IOException {
+        final DataOutputSerializer out = new DataOutputSerializer(1);
+        out.writeInt(state.getHanging().size());
+        for (TransactionId tid : state.getHanging()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        out.writeInt(state.getPrepared().size());
+        for (TransactionId tid : state.getPrepared()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        return out.getSharedBuffer();
+    }
+
+    @Override
+    public JdbcWriterState deserialize(int version, byte[] serialized) throws IOException {
+        final DataInputDeserializer in = new DataInputDeserializer(serialized);

Review Comment:
   It looks like we could move this var initialization into `if` block to avoid useless init when `version != getVersion() `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Add JdbcSink with new format [flink-connector-jdbc]

Posted by "RocMarshal (via GitHub)" <gi...@apache.org>.
RocMarshal commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1441443240


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.statement.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.sink.statement.SimpleJdbcQueryStatement;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder to construct {@link JdbcSink}. */
+@PublicEvolving
+public class JdbcSinkBuilder<IN> {
+
+    private JdbcConnectionProvider connectionProvider;
+    private JdbcExecutionOptions executionOptions;
+    private JdbcQueryStatement<IN> queryStatement;
+
+    public JdbcSinkBuilder() {
+        executionOptions = JdbcExecutionOptions.defaults();
+    }
+
+    public JdbcSinkBuilder<IN> withExecutionOptions(JdbcExecutionOptions executionOptions) {
+        this.executionOptions = checkNotNull(executionOptions, "executionOptions cannot be empty");
+        return this;
+    }
+
+    private JdbcSinkBuilder<IN> withConnectionProvider(JdbcConnectionProvider connectionProvider) {
+        this.connectionProvider =
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty");

Review Comment:
   ditto



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.statement.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.sink.statement.SimpleJdbcQueryStatement;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder to construct {@link JdbcSink}. */
+@PublicEvolving
+public class JdbcSinkBuilder<IN> {
+
+    private JdbcConnectionProvider connectionProvider;
+    private JdbcExecutionOptions executionOptions;
+    private JdbcQueryStatement<IN> queryStatement;
+
+    public JdbcSinkBuilder() {
+        executionOptions = JdbcExecutionOptions.defaults();
+    }
+
+    public JdbcSinkBuilder<IN> withExecutionOptions(JdbcExecutionOptions executionOptions) {
+        this.executionOptions = checkNotNull(executionOptions, "executionOptions cannot be empty");

Review Comment:
   The 'empty' in the hint message could be 'null'  ?
   Because this corresponds more rigorously to the verification logic



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.sink.statement.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.sink.statement.SimpleJdbcQueryStatement;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder to construct {@link JdbcSink}. */
+@PublicEvolving
+public class JdbcSinkBuilder<IN> {
+
+    private JdbcConnectionProvider connectionProvider;
+    private JdbcExecutionOptions executionOptions;
+    private JdbcQueryStatement<IN> queryStatement;
+
+    public JdbcSinkBuilder() {
+        executionOptions = JdbcExecutionOptions.defaults();
+    }
+
+    public JdbcSinkBuilder<IN> withExecutionOptions(JdbcExecutionOptions executionOptions) {
+        this.executionOptions = checkNotNull(executionOptions, "executionOptions cannot be empty");
+        return this;
+    }
+
+    private JdbcSinkBuilder<IN> withConnectionProvider(JdbcConnectionProvider connectionProvider) {
+        this.connectionProvider =
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty");
+        return this;
+    }
+
+    public JdbcSinkBuilder<IN> withConnectionProvider(JdbcConnectionOptions connectionOptions) {
+        return withConnectionProvider(new SimpleJdbcConnectionProvider(connectionOptions));
+    }
+
+    public JdbcSinkBuilder<IN> withQueryStatement(JdbcQueryStatement<IN> queryStatement) {
+        this.queryStatement = queryStatement;
+        return this;
+    }
+
+    public JdbcSinkBuilder<IN> withQueryStatement(
+            String query, JdbcStatementBuilder<IN> statement) {
+        this.queryStatement = new SimpleJdbcQueryStatement<>(query, statement);
+        return this;
+    }
+
+    public JdbcSink<IN> build() {
+        checkNotNull(connectionProvider, "connectionProvider cannot be empty");
+        checkNotNull(executionOptions, "executionOptions cannot be empty");
+        checkNotNull(queryStatement, "queryStatement cannot be empty");

Review Comment:
   ditto



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java:
##########
@@ -0,0 +1,295 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.EmptyTransactionXaException;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.TransientXaException;
+import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/** */
+public class XaTransaction implements Serializable, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(XaTransaction.class);
+
+    private final XaConnectionProvider xaConnectionProvider;
+    private final JdbcExactlyOnceOptions exactlyOnceOptions;
+
+    // checkpoints and the corresponding transactions waiting for completion notification from JM
+    private transient List<TransactionId> preparedXids = new ArrayList<>();
+    // hanging XIDs - used for cleanup
+    // it's a list to support retries and scaling down
+    // possible transaction states: active, idle, prepared
+    // last element is the current xid
+    private transient Deque<TransactionId> hangingXids = new LinkedList<>();
+    private transient TransactionId currentTid;
+
+    private final TransactionId baseTransaction;
+
+    public XaTransaction(
+            JdbcExactlyOnceOptions exactlyOnceOptions,
+            TransactionId transactionId,
+            XaConnectionProvider xaFacade) {
+        this.xaConnectionProvider = xaFacade;
+        this.exactlyOnceOptions = exactlyOnceOptions;
+        this.baseTransaction = transactionId;
+    }
+
+    public Xid getCurrentXid() {
+        return currentTid;
+    }
+
+    public XaConnectionProvider getConnectionProvider() {
+        return xaConnectionProvider;
+    }
+
+    public JdbcWriterState getState() {
+        return JdbcWriterState.of(preparedXids, hangingXids);
+    }
+
+    public void open(JdbcWriterState state) throws IOException {
+        try {
+            xaConnectionProvider.open();
+            recoverState(state);
+            hangingXids = new LinkedList<>(failOrRollback(hangingXids).getForRetry());
+            commitTx();
+            if (exactlyOnceOptions.isDiscoverAndRollbackOnRecovery()) {
+                // Pending transactions which are not included into the checkpoint might hold locks
+                // and
+                // should be rolled back. However, rolling back ALL transactions can cause data
+                // loss. So
+                // each subtask first commits transactions from its state and then rolls back
+                // discovered
+                // transactions if they belong to it.
+                recoverAndRollback();
+            }
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (currentTid != null && xaConnectionProvider.isOpen()) {
+            try {
+                LOG.debug(
+                        "remove current transaction before closing, xid={}",
+                        currentTid.getXidValue());
+                xaConnectionProvider.failAndRollback(currentTid);
+            } catch (Exception e) {
+                LOG.warn(
+                        "unable to fail/rollback current transaction, xid={}",
+                        currentTid.getXidValue(),
+                        e);
+            }
+        }
+        xaConnectionProvider.close();
+        currentTid = null;
+        hangingXids = null;
+        preparedXids = null;
+    }
+
+    public void recoverState(JdbcWriterState state) {
+        hangingXids = new LinkedList<>(state.getHanging());
+        preparedXids = new ArrayList<>(state.getPrepared());
+        LOG.info(
+                "initialized state: prepared xids: {}, hanging xids: {}",
+                preparedXids.size(),
+                hangingXids.size());
+    }
+
+    public void checkState() {
+        Preconditions.checkState(currentTid != null, "current xid must not be null");
+        Preconditions.checkState(
+                !hangingXids.isEmpty() && hangingXids.peekLast().equals(currentTid),
+                "inconsistent internal state");
+    }
+
+    /** @param checkpointId to associate with the new transaction. */
+    public void createTx(long checkpointId) throws IOException {
+        try {
+            Preconditions.checkState(currentTid == null, "currentXid not null");
+            currentTid = baseTransaction.withBranch(checkpointId);
+            hangingXids.offerLast(currentTid);
+            xaConnectionProvider.start(currentTid);
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+    }
+
+    public void prepareTx() throws IOException {
+        checkState();
+        hangingXids.pollLast();
+        try {
+            xaConnectionProvider.endAndPrepare(currentTid);
+            preparedXids.add(currentTid);
+        } catch (EmptyTransactionXaException e) {
+            LOG.info(
+                    "empty XA transaction (skip), xid: {}, checkpoint {}",
+                    currentTid.getXidValue(),
+                    currentTid.getCheckpointId());
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+        currentTid = null;
+    }
+
+    public void commitTx() {
+        List<TransactionId> toCommit = preparedXids;
+        preparedXids = new ArrayList<>();
+        preparedXids.addAll(commitXids(toCommit));
+    }
+
+    public void commitTxUntil(long checkpointId) {
+        Tuple2<List<TransactionId>, List<TransactionId>> splittedXids =
+                split(preparedXids, checkpointId);
+
+        if (splittedXids.f0.isEmpty()) {
+            LOG.warn("nothing to commit up to checkpoint: {}", checkpointId);
+        } else {
+            preparedXids = splittedXids.f1;
+            preparedXids.addAll(commitXids(splittedXids.f0));
+        }
+    }
+
+    public List<TransactionId> commitXids(List<TransactionId> xids) {
+        return commit(
+                        xids,
+                        exactlyOnceOptions.isAllowOutOfOrderCommits(),
+                        exactlyOnceOptions.getMaxCommitAttempts())
+                .getForRetry();
+    }
+
+    private Tuple2<List<TransactionId>, List<TransactionId>> split(
+            List<TransactionId> list, long checkpointId) {
+        return split(list, checkpointId, true);
+    }
+
+    private Tuple2<List<TransactionId>, List<TransactionId>> split(
+            List<TransactionId> list, long checkpointId, boolean checkpointIntoLo) {
+
+        List<TransactionId> lo = new ArrayList<>(list.size() / 2);
+        List<TransactionId> hi = new ArrayList<>(list.size() / 2);
+        list.forEach(
+                i -> {
+                    if (i.getCheckpointId() < checkpointId
+                            || (i.getCheckpointId() == checkpointId && checkpointIntoLo)) {
+                        lo.add(i);
+                    } else {
+                        hi.add(i);
+                    }
+                });
+        return new Tuple2<>(lo, hi);
+    }
+
+    private XaTransactionResult<TransactionId> commit(
+            List<TransactionId> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts) {
+        XaTransactionResult<TransactionId> result = new XaTransactionResult<>();
+        int origSize = xids.size();
+        LOG.debug("commit {} transactions", origSize);
+        for (Iterator<TransactionId> i = xids.iterator();
+                i.hasNext() && (result.hasNoFailures() || allowOutOfOrderCommits); ) {
+            TransactionId x = i.next();
+            i.remove();
+            try {
+                xaConnectionProvider.commit(x, x.getRestored());
+                result.succeeded(x);
+            } catch (TransientXaException e) {
+                result.failedTransiently(x.withAttemptsIncremented(), e);
+            } catch (Exception e) {
+                result.failed(x, e);
+            }
+        }
+        result.getForRetry().addAll(xids);
+        result.throwIfAnyFailed("commit");
+        throwIfAnyReachedMaxAttempts(result, maxCommitAttempts);
+        result.getTransientFailure()
+                .ifPresent(
+                        f ->
+                                LOG.warn(
+                                        "failed to commit {} transactions out of {} (keep them to retry later)",
+                                        result.getForRetry().size(),
+                                        origSize,
+                                        f));
+        return result;
+    }
+
+    private XaTransactionResult<TransactionId> failOrRollback(Collection<TransactionId> xids) {
+        XaTransactionResult<TransactionId> result = new XaTransactionResult<>();
+        if (xids.isEmpty()) {
+            return result;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("rolling back {} transactions: {}", xids.size(), xids);
+        }
+        for (TransactionId x : xids) {
+            try {
+                xaConnectionProvider.failAndRollback(x);
+                result.succeeded(x);
+            } catch (TransientXaException e) {
+                LOG.info("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
+                result.failedTransiently(x, e);
+            } catch (Exception e) {
+                LOG.warn("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
+                result.failed(x, e);
+            }
+        }
+        if (!result.getForRetry().isEmpty()) {
+            LOG.info("failed to roll back {} transactions", result.getForRetry().size());
+        }
+        return result;
+    }
+
+    private void recoverAndRollback() {
+        Collection<Xid> recovered = xaConnectionProvider.recover();
+        if (recovered.isEmpty()) {
+            return;
+        }
+        LOG.warn("rollback {} recovered transactions", recovered.size());
+        for (Xid xid : recovered) {
+            if (baseTransaction.belongsTo(xid)) {
+                try {
+                    xaConnectionProvider.rollback(xid);
+                } catch (Exception e) {
+                    LOG.info("unable to rollback recovered transaction, xid={}", xid, e);
+                }
+            }
+        }
+    }
+
+    private void throwIfAnyReachedMaxAttempts(
+            XaTransactionResult<TransactionId> result, int maxAttempts) {
+        List<TransactionId> reached = null;
+        for (TransactionId x : result.getForRetry()) {
+            if (x.getAttempts() >= maxAttempts) {
+                if (reached == null) {
+                    reached = new ArrayList<>();
+                }
+                reached.add(x);
+            }
+        }
+        if (reached != null) {

Review Comment:
   Maybe `Precondition` utils is the better option.



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.PoolingXaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import javax.sql.XADataSource;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder to construct {@link JdbcSink}. */
+@PublicEvolving
+public class JdbcSinkBuilder<IN> {
+
+    private JdbcExecutionOptions executionOptions;
+    private JdbcQueryStatement<IN> queryStatement;
+
+    public JdbcSinkBuilder() {
+        this.executionOptions = JdbcExecutionOptions.defaults();
+    }
+
+    public JdbcSinkBuilder<IN> withExecutionOptions(JdbcExecutionOptions executionOptions) {
+        this.executionOptions = checkNotNull(executionOptions, "executionOptions cannot be empty");
+        return this;
+    }
+
+    public JdbcSinkBuilder<IN> withQueryStatement(JdbcQueryStatement<IN> queryStatement) {
+        this.queryStatement = queryStatement;
+        return this;
+    }
+
+    public JdbcSinkBuilder<IN> withQueryStatement(
+            String query, JdbcStatementBuilder<IN> statement) {
+        this.queryStatement = new SimpleJdbcQueryStatement<>(query, statement);
+        return this;
+    }
+
+    public JdbcSink<IN> buildAtLeastOnce(JdbcConnectionOptions connectionOptions) {
+        checkNotNull(connectionOptions, "connectionOptions cannot be empty");
+
+        return buildAtLeastOnce(new SimpleJdbcConnectionProvider(connectionOptions));
+    }
+
+    public JdbcSink<IN> buildAtLeastOnce(JdbcConnectionProvider connectionProvider) {
+        checkNotNull(connectionProvider, "connectionProvider cannot be empty");
+
+        return build(
+                DeliveryGuarantee.AT_LEAST_ONCE,
+                JdbcExactlyOnceOptions.defaults(),
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty"));
+    }
+
+    public JdbcSink<IN> buildExactlyOnce(
+            JdbcExactlyOnceOptions exactlyOnceOptions,
+            SerializableSupplier<XADataSource> dataSourceSupplier) {
+
+        checkNotNull(exactlyOnceOptions, "exactlyOnceOptions cannot be empty");
+        checkNotNull(dataSourceSupplier, "dataSourceSupplier cannot be empty");
+        XaConnectionProvider connectionProvider =
+                exactlyOnceOptions.isTransactionPerConnection()
+                        ? PoolingXaConnectionProvider.from(
+                                dataSourceSupplier, exactlyOnceOptions.getTimeoutSec())
+                        : SimpleXaConnectionProvider.from(
+                                dataSourceSupplier, exactlyOnceOptions.getTimeoutSec());
+        return buildExactlyOnce(exactlyOnceOptions, connectionProvider);
+    }
+
+    public JdbcSink<IN> buildExactlyOnce(
+            JdbcExactlyOnceOptions exactlyOnceOptions, XaConnectionProvider connectionProvider) {
+
+        return build(
+                DeliveryGuarantee.EXACTLY_ONCE,
+                checkNotNull(exactlyOnceOptions, "exactlyOnceOptions cannot be empty"),
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty"));
+    }
+
+    private JdbcSink<IN> build(
+            DeliveryGuarantee deliveryGuarantee,
+            JdbcExactlyOnceOptions exactlyOnceOptions,
+            JdbcConnectionProvider connectionProvider) {
+
+        return new JdbcSink<>(
+                checkNotNull(deliveryGuarantee, "deliveryGuarantee cannot be empty"),
+                checkNotNull(connectionProvider, "connectionProvider cannot be empty"),
+                checkNotNull(executionOptions, "executionOptions cannot be empty"),
+                checkNotNull(exactlyOnceOptions, "exactlyOnceOptions cannot be empty"),
+                checkNotNull(queryStatement, "queryStatement cannot be empty"));

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475723420


##########
.github/workflows/weekly.yml:
##########
@@ -27,25 +27,13 @@ jobs:
     strategy:
       matrix:
         flink_branches: [{
-          flink: 1.16-SNAPSHOT,
-          branch: main
-        }, {
-          flink: 1.17-SNAPSHOT,
-          branch: main
-        }, {
           flink: 1.18-SNAPSHOT,
           jdk: '8, 11, 17',
           branch: main
         }, {
           flink: 1.19-SNAPSHOT,
           jdk: '8, 11, 17, 21',
           branch: main
-        }, {
-          flink: 1.16.2,
-          branch: v3.1
-        }, {
-          flink: 1.17.1,
-          branch: v3.1
         }, {

Review Comment:
   should we remove ci for v3.1 ?
   I guess this change should not be merged to v3.1 and as a result v3.1 should still continue working/being able to build with 1.16, 1.17
   or did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475718751


##########
pom.xml:
##########
@@ -47,7 +47,7 @@ under the License.
     </modules>
 
     <properties>
-        <flink.version>1.16.2</flink.version>
+        <flink.version>1.18.0</flink.version>

Review Comment:
   I think in that case we probably need to bump connector version as well to something like 4.0.0
   WDYT @MartijnVisser ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475842140


##########
.github/workflows/weekly.yml:
##########
@@ -27,25 +27,13 @@ jobs:
     strategy:
       matrix:
         flink_branches: [{
-          flink: 1.16-SNAPSHOT,
-          branch: main
-        }, {
-          flink: 1.17-SNAPSHOT,
-          branch: main
-        }, {
           flink: 1.18-SNAPSHOT,
           jdk: '8, 11, 17',
           branch: main
         }, {
           flink: 1.19-SNAPSHOT,
           jdk: '8, 11, 17, 21',
           branch: main
-        }, {
-          flink: 1.16.2,
-          branch: v3.1
-        }, {
-          flink: 1.17.1,
-          branch: v3.1
         }, {

Review Comment:
   We could release 3.2 (or 3.1.X) with current main code giving support for 1.16, 1.17 and 1.18 (1.18 still not support by any release), I don't know why still don't have release an 1.18 compatible version, any blocker?
   
   And them merge this and release again with (minor or major)..
   Would this work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1475718751


##########
pom.xml:
##########
@@ -47,7 +47,7 @@ under the License.
     </modules>
 
     <properties>
-        <flink.version>1.16.2</flink.version>
+        <flink.version>1.18.0</flink.version>

Review Comment:
   I think in that case we probably need to bump connector version as well to something like 4.0.0
   or at least make a release of what we have in main branch
   WDYT @MartijnVisser ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1881065785

   @RocMarshal, @snuyanzin thanks for the review..
   I address all code changes, Im working on java docs that are missing a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444594096


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.statements;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record.
+ */
+@PublicEvolving
+public interface JdbcQueryStatement<T> extends Serializable {
+    String query();
+
+    void map(PreparedStatement ps, T data) throws SQLException;

Review Comment:
   This is to have the query, and how to set values to the PStatement on same place..
   calling JdbcStatementFiller for example we are focus on the second method..
   
   I agree we can change map to another thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on code in PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1452316874


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitableSerializer.java:
##########
@@ -0,0 +1,37 @@
+package org.apache.flink.connector.jdbc.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.xa.XidSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+
+/** */

Review Comment:
   done



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitter.java:
##########
@@ -0,0 +1,62 @@
+package org.apache.flink.connector.jdbc.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
+import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/** */

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

Posted by "eskabetxe (via GitHub)" <gi...@apache.org>.
eskabetxe commented on PR #2:
URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1963784832

   @snuyanzin @wanglijie95 is passing now the tests..
   could you check when have some time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org