You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/07/25 12:53:36 UTC

[GitHub] [inlong] Oneal65 opened a new pull request, #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Oneal65 opened a new pull request, #5200:
URL: https://github.com/apache/inlong/pull/5200

   - Fixes #5100 
   
   ### Motivation
   
   Add reporting metrics for JDBC
   
   ### Modifications
   
   Add reporting metrics for JDBC
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r936446935


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -122,7 +122,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
         this.runtimeContext = getRuntimeContext();
         metricData = new MetricData(runtimeContext.getMetricGroup());
         if (inLongMetric != null && !inLongMetric.isEmpty()) {

Review Comment:
   use SinkMetricData of connector-base after git rebase code 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r936330964


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java:
##########
@@ -44,6 +45,7 @@ public final class JdbcDialects {
         DIALECTS.add(new TDSQLPostgresDialect());
         DIALECTS.add(new SqlServerDialect());
         DIALECTS.add(new OracleDialect());
+        DIALECTS.add(new PostgresDialect());

Review Comment:
   DONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r930560547


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;

Review Comment:
   Please add some comments and describe what you have changed int this Class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r929838720


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/metric/MetricData.java:
##########
@@ -0,0 +1,111 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * A collection class for handling metrics
+ */
+public class MetricData {

Review Comment:
   It will be done in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r930617716


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;

Review Comment:
   OK👌



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r930564312


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.jdbc.metric.MetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A JDBC outputFormat that supports batching records before writing records to database.
+ */
+public class JdbcBatchingOutputFormat<
+        In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>>
+        extends AbstractJdbcOutputFormat<In> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcBatchingOutputFormat.class);
+    private final JdbcExecutionOptions executionOptions;
+    private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
+    private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
+    private final String inLongMetric;
+    private transient JdbcExec jdbcStatementExecutor;
+    private transient int batchCount = 0;
+    private transient volatile boolean closed = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+    private transient RuntimeContext runtimeContext;
+
+    private MetricData metricData;
+    private Long dataSize = 0L;
+    private Long rowSize = 0L;
+
+    public JdbcBatchingOutputFormat(
+            @Nonnull JdbcConnectionProvider connectionProvider,
+            @Nonnull JdbcExecutionOptions executionOptions,
+            @Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
+            @Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
+            String inLongMetric) {
+        super(connectionProvider);
+        this.executionOptions = checkNotNull(executionOptions);
+        this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
+        this.jdbcRecordExtractor = checkNotNull(recordExtractor);
+        this.inLongMetric = inLongMetric;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor(
+            String sql, int[] fieldTypes, boolean objectReuse) {
+        return JdbcBatchStatementExecutor.simple(
+                sql,
+                createRowJdbcStatementBuilder(fieldTypes),
+                objectReuse ? Row::copy : Function.identity());
+    }
+
+    /**
+     * Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array.
+     * Uses {@link JdbcUtils#setRecordToStatement}
+     */
+    static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) {
+        return (st, record) -> setRecordToStatement(st, types, record);
+    }
+
+    /**
+     * Connects to the target database and initializes the prepared statement.
+     *
+     * @param taskNumber The number of the parallel instance.
+     */
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        super.open(taskNumber, numTasks);
+        this.runtimeContext = getRuntimeContext();
+        metricData = new MetricData(runtimeContext.getMetricGroup());
+        if (inLongMetric != null && !inLongMetric.isEmpty()) {
+            String[] inLongMetricArray = inLongMetric.split("_");
+            String groupId = inLongMetricArray[0];
+            String streamId = inLongMetricArray[1];
+            String nodeId = inLongMetricArray[2];
+            metricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+            metricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+            metricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+            metricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+            metricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
+            metricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+                    "numRecordsOutPerSecond");
+        }
+        jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
+        if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
+            this.scheduler =
+                    Executors.newScheduledThreadPool(
+                            1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
+            this.scheduledFuture =
+                    this.scheduler.scheduleWithFixedDelay(
+                            () -> {
+                                synchronized (JdbcBatchingOutputFormat.this) {
+                                    if (!closed) {
+                                        try {
+                                            flush();
+                                            if (metricData.getNumRecordsOut() != null) {
+                                                metricData.getNumRecordsOut().inc(rowSize);
+                                            }
+                                            if (metricData.getNumRecordsOut() != null) {

Review Comment:
   `metricData.getNumRecordsOut() != null` should be `metricData.getNumBytesOut()`  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on PR #5200:
URL: https://github.com/apache/inlong/pull/5200#issuecomment-1203601287

   > Modify delimiter of inlong.metric, from `_` to `&`. Because groupId and streamId can contain `_`, it will lead split error.
   
   @gong DONE


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r933129942


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java:
##########
@@ -44,6 +45,7 @@ public final class JdbcDialects {
         DIALECTS.add(new TDSQLPostgresDialect());
         DIALECTS.add(new SqlServerDialect());
         DIALECTS.add(new OracleDialect());
+        DIALECTS.add(new PostgresDialect());

Review Comment:
   move PostgresDialect(). Because pg use `dialectImpl` to register custom dialect.
   see this file JdbcDynamicTableFactory.java
   ```java
   final Optional<JdbcDialect> dialect = dialectImplOptional.map(JdbcDialects::register)
                   .orElseGet(() -> JdbcDialects.get(jdbcUrl));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap merged pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
EMsnap merged PR #5200:
URL: https://github.com/apache/inlong/pull/5200


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r936534677


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -122,7 +122,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
         this.runtimeContext = getRuntimeContext();
         metricData = new MetricData(runtimeContext.getMetricGroup());
         if (inLongMetric != null && !inLongMetric.isEmpty()) {

Review Comment:
   All pr about metrics will be refactored in One PR in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
gong commented on PR #5200:
URL: https://github.com/apache/inlong/pull/5200#issuecomment-1196553710

   `PostgresLoadNode` connector is jdbc. This will lead metric is invalid for posgreSQL.  We should add Dialect for postgresql and get Dialect by add `dialectImpl` param. original function that get dialect by jdbc url. TDSQLPostgres and postgreSQL url is same.@Oneal65


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r933129942


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java:
##########
@@ -44,6 +45,7 @@ public final class JdbcDialects {
         DIALECTS.add(new TDSQLPostgresDialect());
         DIALECTS.add(new SqlServerDialect());
         DIALECTS.add(new OracleDialect());
+        DIALECTS.add(new PostgresDialect());

Review Comment:
   remove PostgresDialect(). Because pg use `dialectImpl` to register custom dialect.
   see this file JdbcDynamicTableFactory.java
   ```java
   final Optional<JdbcDialect> dialect = dialectImplOptional.map(JdbcDialects::register)
                   .orElseGet(() -> JdbcDialects.get(jdbcUrl));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r929589041


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/metric/MetricData.java:
##########
@@ -0,0 +1,111 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * A collection class for handling metrics
+ */
+public class MetricData {

Review Comment:
   can the metric data be extracted to common so that other connector can reuse it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
gong commented on PR #5200:
URL: https://github.com/apache/inlong/pull/5200#issuecomment-1200391057

   Modify delimiter of inlong.metric, from `_` to `&`. Because groupId and streamId can contain `_`, it will lead split error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r939786689


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * A generic SinkFunction for JDBC.
+ * add metrics reporter in JdbcBatchingOutputFormat
+ */

Review Comment:
   Please add some comment to explain what you have copied and what you have changed



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.jdbc.metric.MetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A JDBC outputFormat that supports batching records before writing records to database.

Review Comment:
   Please add some comment to explain what you have copied and what you have changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r939786577


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getPrimaryKey;
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkArgument;
+

Review Comment:
   Please add some comment to explain what you have copied and what you have changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r930564393


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.jdbc.metric.MetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A JDBC outputFormat that supports batching records before writing records to database.
+ */
+public class JdbcBatchingOutputFormat<
+        In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>>
+        extends AbstractJdbcOutputFormat<In> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcBatchingOutputFormat.class);
+    private final JdbcExecutionOptions executionOptions;
+    private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
+    private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
+    private final String inLongMetric;
+    private transient JdbcExec jdbcStatementExecutor;
+    private transient int batchCount = 0;
+    private transient volatile boolean closed = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+    private transient RuntimeContext runtimeContext;
+
+    private MetricData metricData;
+    private Long dataSize = 0L;
+    private Long rowSize = 0L;
+
+    public JdbcBatchingOutputFormat(
+            @Nonnull JdbcConnectionProvider connectionProvider,
+            @Nonnull JdbcExecutionOptions executionOptions,
+            @Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
+            @Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
+            String inLongMetric) {
+        super(connectionProvider);
+        this.executionOptions = checkNotNull(executionOptions);
+        this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
+        this.jdbcRecordExtractor = checkNotNull(recordExtractor);
+        this.inLongMetric = inLongMetric;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor(
+            String sql, int[] fieldTypes, boolean objectReuse) {
+        return JdbcBatchStatementExecutor.simple(
+                sql,
+                createRowJdbcStatementBuilder(fieldTypes),
+                objectReuse ? Row::copy : Function.identity());
+    }
+
+    /**
+     * Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array.
+     * Uses {@link JdbcUtils#setRecordToStatement}
+     */
+    static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) {
+        return (st, record) -> setRecordToStatement(st, types, record);
+    }
+
+    /**
+     * Connects to the target database and initializes the prepared statement.
+     *
+     * @param taskNumber The number of the parallel instance.
+     */
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        super.open(taskNumber, numTasks);
+        this.runtimeContext = getRuntimeContext();
+        metricData = new MetricData(runtimeContext.getMetricGroup());
+        if (inLongMetric != null && !inLongMetric.isEmpty()) {
+            String[] inLongMetricArray = inLongMetric.split("_");
+            String groupId = inLongMetricArray[0];
+            String streamId = inLongMetricArray[1];
+            String nodeId = inLongMetricArray[2];
+            metricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+            metricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+            metricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+            metricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+            metricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
+            metricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+                    "numRecordsOutPerSecond");
+        }
+        jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
+        if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
+            this.scheduler =
+                    Executors.newScheduledThreadPool(
+                            1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
+            this.scheduledFuture =
+                    this.scheduler.scheduleWithFixedDelay(
+                            () -> {
+                                synchronized (JdbcBatchingOutputFormat.this) {
+                                    if (!closed) {
+                                        try {
+                                            flush();
+                                            if (metricData.getNumRecordsOut() != null) {
+                                                metricData.getNumRecordsOut().inc(rowSize);
+                                            }
+                                            if (metricData.getNumRecordsOut() != null) {
+                                                metricData.getNumBytesOut()
+                                                        .inc(dataSize);
+                                            }
+                                            resetStateAfterFlush();
+                                        } catch (Exception e) {
+                                            if (metricData.getDirtyRecords() != null) {
+                                                metricData.getDirtyRecords().inc(rowSize);
+                                            }
+                                            if (metricData.getDirtyBytes() != null) {
+                                                metricData.getDirtyBytes().inc(dataSize);
+                                            }
+                                            resetStateAfterFlush();
+                                            flushException = e;
+                                        }
+                                    }
+                                }
+                            },
+                            executionOptions.getBatchIntervalMs(),
+                            executionOptions.getBatchIntervalMs(),
+                            TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private JdbcExec createAndOpenStatementExecutor(
+            StatementExecutorFactory<JdbcExec> statementExecutorFactory) throws IOException {
+        JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
+        try {
+            exec.prepareStatements(connectionProvider.getConnection());
+        } catch (SQLException e) {
+            throw new IOException("unable to open JDBC writer", e);
+        }
+        return exec;
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to JDBC failed.", flushException);
+        }
+    }
+
+    @Override
+    public final synchronized void writeRecord(In record) throws IOException {
+        checkFlushException();
+
+        rowSize++;
+        dataSize = dataSize + record.toString().getBytes(StandardCharsets.UTF_8).length;
+        try {
+            addToBatch(record, jdbcRecordExtractor.apply(record));
+            batchCount++;
+            if (executionOptions.getBatchSize() > 0
+                    && batchCount >= executionOptions.getBatchSize()) {
+                flush();
+                if (metricData.getNumRecordsOut() != null) {
+                    metricData.getNumRecordsOut().inc(rowSize);
+                }
+                if (metricData.getNumRecordsOut() != null) {

Review Comment:
   `metricData.getNumRecordsOut() != null` should be `metricData.getNumBytesOut()`  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #5200:
URL: https://github.com/apache/inlong/pull/5200#discussion_r930617963


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.jdbc.metric.MetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A JDBC outputFormat that supports batching records before writing records to database.
+ */
+public class JdbcBatchingOutputFormat<
+        In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>>
+        extends AbstractJdbcOutputFormat<In> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcBatchingOutputFormat.class);
+    private final JdbcExecutionOptions executionOptions;
+    private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
+    private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
+    private final String inLongMetric;
+    private transient JdbcExec jdbcStatementExecutor;
+    private transient int batchCount = 0;
+    private transient volatile boolean closed = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+    private transient RuntimeContext runtimeContext;
+
+    private MetricData metricData;
+    private Long dataSize = 0L;
+    private Long rowSize = 0L;
+
+    public JdbcBatchingOutputFormat(
+            @Nonnull JdbcConnectionProvider connectionProvider,
+            @Nonnull JdbcExecutionOptions executionOptions,
+            @Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
+            @Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
+            String inLongMetric) {
+        super(connectionProvider);
+        this.executionOptions = checkNotNull(executionOptions);
+        this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
+        this.jdbcRecordExtractor = checkNotNull(recordExtractor);
+        this.inLongMetric = inLongMetric;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor(
+            String sql, int[] fieldTypes, boolean objectReuse) {
+        return JdbcBatchStatementExecutor.simple(
+                sql,
+                createRowJdbcStatementBuilder(fieldTypes),
+                objectReuse ? Row::copy : Function.identity());
+    }
+
+    /**
+     * Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array.
+     * Uses {@link JdbcUtils#setRecordToStatement}
+     */
+    static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) {
+        return (st, record) -> setRecordToStatement(st, types, record);
+    }
+
+    /**
+     * Connects to the target database and initializes the prepared statement.
+     *
+     * @param taskNumber The number of the parallel instance.
+     */
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        super.open(taskNumber, numTasks);
+        this.runtimeContext = getRuntimeContext();
+        metricData = new MetricData(runtimeContext.getMetricGroup());
+        if (inLongMetric != null && !inLongMetric.isEmpty()) {
+            String[] inLongMetricArray = inLongMetric.split("_");
+            String groupId = inLongMetricArray[0];
+            String streamId = inLongMetricArray[1];
+            String nodeId = inLongMetricArray[2];
+            metricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+            metricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+            metricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+            metricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+            metricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
+            metricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+                    "numRecordsOutPerSecond");
+        }
+        jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
+        if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
+            this.scheduler =
+                    Executors.newScheduledThreadPool(
+                            1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
+            this.scheduledFuture =
+                    this.scheduler.scheduleWithFixedDelay(
+                            () -> {
+                                synchronized (JdbcBatchingOutputFormat.this) {
+                                    if (!closed) {
+                                        try {
+                                            flush();
+                                            if (metricData.getNumRecordsOut() != null) {
+                                                metricData.getNumRecordsOut().inc(rowSize);
+                                            }
+                                            if (metricData.getNumRecordsOut() != null) {

Review Comment:
   OK



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.jdbc.metric.MetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A JDBC outputFormat that supports batching records before writing records to database.
+ */
+public class JdbcBatchingOutputFormat<
+        In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>>
+        extends AbstractJdbcOutputFormat<In> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcBatchingOutputFormat.class);
+    private final JdbcExecutionOptions executionOptions;
+    private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
+    private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
+    private final String inLongMetric;
+    private transient JdbcExec jdbcStatementExecutor;
+    private transient int batchCount = 0;
+    private transient volatile boolean closed = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+    private transient RuntimeContext runtimeContext;
+
+    private MetricData metricData;
+    private Long dataSize = 0L;
+    private Long rowSize = 0L;
+
+    public JdbcBatchingOutputFormat(
+            @Nonnull JdbcConnectionProvider connectionProvider,
+            @Nonnull JdbcExecutionOptions executionOptions,
+            @Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
+            @Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
+            String inLongMetric) {
+        super(connectionProvider);
+        this.executionOptions = checkNotNull(executionOptions);
+        this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
+        this.jdbcRecordExtractor = checkNotNull(recordExtractor);
+        this.inLongMetric = inLongMetric;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor(
+            String sql, int[] fieldTypes, boolean objectReuse) {
+        return JdbcBatchStatementExecutor.simple(
+                sql,
+                createRowJdbcStatementBuilder(fieldTypes),
+                objectReuse ? Row::copy : Function.identity());
+    }
+
+    /**
+     * Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array.
+     * Uses {@link JdbcUtils#setRecordToStatement}
+     */
+    static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) {
+        return (st, record) -> setRecordToStatement(st, types, record);
+    }
+
+    /**
+     * Connects to the target database and initializes the prepared statement.
+     *
+     * @param taskNumber The number of the parallel instance.
+     */
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        super.open(taskNumber, numTasks);
+        this.runtimeContext = getRuntimeContext();
+        metricData = new MetricData(runtimeContext.getMetricGroup());
+        if (inLongMetric != null && !inLongMetric.isEmpty()) {
+            String[] inLongMetricArray = inLongMetric.split("_");
+            String groupId = inLongMetricArray[0];
+            String streamId = inLongMetricArray[1];
+            String nodeId = inLongMetricArray[2];
+            metricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+            metricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+            metricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+            metricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+            metricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
+            metricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+                    "numRecordsOutPerSecond");
+        }
+        jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
+        if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
+            this.scheduler =
+                    Executors.newScheduledThreadPool(
+                            1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
+            this.scheduledFuture =
+                    this.scheduler.scheduleWithFixedDelay(
+                            () -> {
+                                synchronized (JdbcBatchingOutputFormat.this) {
+                                    if (!closed) {
+                                        try {
+                                            flush();
+                                            if (metricData.getNumRecordsOut() != null) {
+                                                metricData.getNumRecordsOut().inc(rowSize);
+                                            }
+                                            if (metricData.getNumRecordsOut() != null) {
+                                                metricData.getNumBytesOut()
+                                                        .inc(dataSize);
+                                            }
+                                            resetStateAfterFlush();
+                                        } catch (Exception e) {
+                                            if (metricData.getDirtyRecords() != null) {
+                                                metricData.getDirtyRecords().inc(rowSize);
+                                            }
+                                            if (metricData.getDirtyBytes() != null) {
+                                                metricData.getDirtyBytes().inc(dataSize);
+                                            }
+                                            resetStateAfterFlush();
+                                            flushException = e;
+                                        }
+                                    }
+                                }
+                            },
+                            executionOptions.getBatchIntervalMs(),
+                            executionOptions.getBatchIntervalMs(),
+                            TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private JdbcExec createAndOpenStatementExecutor(
+            StatementExecutorFactory<JdbcExec> statementExecutorFactory) throws IOException {
+        JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
+        try {
+            exec.prepareStatements(connectionProvider.getConnection());
+        } catch (SQLException e) {
+            throw new IOException("unable to open JDBC writer", e);
+        }
+        return exec;
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to JDBC failed.", flushException);
+        }
+    }
+
+    @Override
+    public final synchronized void writeRecord(In record) throws IOException {
+        checkFlushException();
+
+        rowSize++;
+        dataSize = dataSize + record.toString().getBytes(StandardCharsets.UTF_8).length;
+        try {
+            addToBatch(record, jdbcRecordExtractor.apply(record));
+            batchCount++;
+            if (executionOptions.getBatchSize() > 0
+                    && batchCount >= executionOptions.getBatchSize()) {
+                flush();
+                if (metricData.getNumRecordsOut() != null) {
+                    metricData.getNumRecordsOut().inc(rowSize);
+                }
+                if (metricData.getNumRecordsOut() != null) {

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on pull request #5200: [INLONG-5100][Sort] Add reporting metrics for JDBC

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on PR #5200:
URL: https://github.com/apache/inlong/pull/5200#issuecomment-1198828074

   > `PostgresLoadNode` connector is jdbc. This will lead metric is invalid for posgreSQL. We should add Dialect for postgresql and get Dialect by add `dialectImpl` param. original function that get dialect by jdbc url. TDSQLPostgres and postgreSQL url is same.@Oneal65
   
   @gong DONE


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org