You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "Yizhou-Yang (via GitHub)" <gi...@apache.org> on 2023/03/13 07:16:40 UTC

[GitHub] [inlong] Yizhou-Yang opened a new pull request, #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Yizhou-Yang opened a new pull request, #7580:
URL: https://github.com/apache/inlong/pull/7580

   ### Prepare a Pull Request
   
   - Fixes #7249 
   
   ### Motivation
   JDBC needs to archive dirty data. In this pr, I tried to explore to possibility of accurate dirty data archive for jdbc by modifying the executors. The modifications are well-tested and stable in most cases, and in other cases it will give a warning "jdbc enhance failed for class:{}", and won't affect normal code.
   
   ### Modification
   Support dirty data accurate archive by using reflection to replace some kinds of executors, and replace the tablesimplestatementexecutor with one that adds metrics.
   
   ### To-do and to-improve
   1) There are 8 total implementations of flink-cdc-executors.
   The currently supported sink executor types are:
   TableBufferedStatementExecutor
   TableBufferReducedStatementExecutor
   TableSimpleStatementExecutor
   
   unsupported executor types: (will not archive dirty data, but will not break code)
   KeyedBatchStatementExecutor
   NoOPStatementExecutor
   TableInsertOrUpdateStatementExecutor
   
   no need to support:
   InsertOrUpdateJdbcExecutor (TableInsertOrUpdateStatementExecutor is enough)
   SimpleBatchStatementExecutor (TableSimpleStatementExecutor is enough)
   
   2) This pr uses Java reflection depending on flink-cdc-connectors. This design can be improved by introducing the executors into Inlong-jdbc-connector, and modifying the builder to directly change the executor class, instead of doing all this reflection. Also, throughput can increase by refactoring too.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on PR #7580:
URL: https://github.com/apache/inlong/pull/7580#issuecomment-1495323813

   @Yizhou-Yang pls resolve conflict


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1156775850


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -656,10 +656,8 @@ protected void attemptFlush() throws IOException {
             } catch (Exception e) {
                 tableException = e;
                 LOG.warn("Flush all data for tableIdentifier:{} get err:", tableIdentifier, e);
-                if (dirtySinkHelper.getDirtySink() == null) {
-                    getAndSetPkFromErrMsg(tableIdentifier, e.getMessage());
-                    updateOneExecutor(true, tableIdentifier);
-                }
+                getAndSetPkFromErrMsg(tableIdentifier, e.getMessage());
+                updateOneExecutor(true, tableIdentifier);

Review Comment:
   getAndSetPkFromErrMsg pass param is error. It should be getAndSetPkFromErrMsg(e.getMessage(), tableIdentifier); you need rebase master when resove conflict



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "healchow (via GitHub)" <gi...@apache.org>.
healchow commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1161170773


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -392,6 +456,58 @@ public final synchronized void writeRecord(In row) throws IOException {
         }
     }
 
+    private void fillDirtyData(JdbcExec exec, String tableIdentifier) {
+        String[] identifiers = tableIdentifier.split("\\.");
+        String database;
+        String table;
+        String schema = null;
+        if (identifiers.length == 2) {
+            database = identifiers[0];
+            table = identifiers[1];
+        } else {
+            database = identifiers[0];
+            schema = identifiers[1];
+            table = identifiers[2];
+        }
+        TableMetricStatementExecutor executor = null;
+        try {
+            Field subExecutor;
+            if (exec instanceof TableMetricStatementExecutor) {
+                executor = (TableMetricStatementExecutor) exec;
+            } else if (exec instanceof TableBufferReducedStatementExecutor) {
+                subExecutor = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+                subExecutor.setAccessible(true);
+                executor = (TableMetricStatementExecutor) subExecutor.get(exec);
+            } else if (exec instanceof TableBufferedStatementExecutor) {
+                subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+                subExecutor.setAccessible(true);
+                executor = (TableMetricStatementExecutor) subExecutor.get(exec);
+            }
+        } catch (Exception e) {
+            LOG.error("parse executor failed" + e);

Review Comment:
   ```suggestion
               LOG.error("parse executor failed: ", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139751607


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -184,7 +191,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
                                     if (!closed) {
                                         try {
                                             flush();
-                                            if (sinkMetricData != null) {
+                                            if (sinkMetricData != null && dirtySink == null) {

Review Comment:
   This report is only needed when TableMetricExecutor is not initialized, since TableMetricExecutor will do precise metric report



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang closed pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang closed pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation
URL: https://github.com/apache/inlong/pull/7580


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on PR #7580:
URL: https://github.com/apache/inlong/pull/7580#issuecomment-1480874564

   @gong  @yunqingmoswu  Thanks for the review. I've addressed your comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1133662495


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -277,7 +294,7 @@ public final synchronized void writeRecord(In record) {
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);

Review Comment:
   why add dirtySink == null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1133669697


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -277,7 +294,7 @@ public final synchronized void writeRecord(In record) {
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);

Review Comment:
   if dirty sink is not null, the tablemetricExecutor will add metrics one time if it is initialized successfully. So the metric will be calculated twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1134912466


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -277,7 +294,7 @@ public final synchronized void writeRecord(In record) {
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139797928


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -277,7 +294,7 @@ public final synchronized void writeRecord(In record) {
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);

Review Comment:
   fixed, now update_before will add metric and return.



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -277,7 +294,7 @@ public final synchronized void writeRecord(In record) {
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);

Review Comment:
   fixed, now update_before will add metric and return. Normal now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang merged PR #7580:
URL: https://github.com/apache/inlong/pull/7580


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "healchow (via GitHub)" <gi...@apache.org>.
healchow commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1161170783


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -392,6 +456,58 @@ public final synchronized void writeRecord(In row) throws IOException {
         }
     }
 
+    private void fillDirtyData(JdbcExec exec, String tableIdentifier) {
+        String[] identifiers = tableIdentifier.split("\\.");
+        String database;
+        String table;
+        String schema = null;
+        if (identifiers.length == 2) {
+            database = identifiers[0];
+            table = identifiers[1];
+        } else {
+            database = identifiers[0];
+            schema = identifiers[1];
+            table = identifiers[2];
+        }
+        TableMetricStatementExecutor executor = null;
+        try {
+            Field subExecutor;
+            if (exec instanceof TableMetricStatementExecutor) {
+                executor = (TableMetricStatementExecutor) exec;
+            } else if (exec instanceof TableBufferReducedStatementExecutor) {
+                subExecutor = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+                subExecutor.setAccessible(true);
+                executor = (TableMetricStatementExecutor) subExecutor.get(exec);
+            } else if (exec instanceof TableBufferedStatementExecutor) {
+                subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+                subExecutor.setAccessible(true);
+                executor = (TableMetricStatementExecutor) subExecutor.get(exec);
+            }
+        } catch (Exception e) {
+            LOG.error("parse executor failed" + e);
+        }
+
+        try {
+            DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+            String dirtyLabel = DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), DirtyType.BATCH_LOAD_ERROR, null,
+                    database, table, schema);
+            String dirtyLogTag =
+                    DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), DirtyType.BATCH_LOAD_ERROR, null,
+                            database, table, schema);
+            String dirtyIdentifier =
+                    DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), DirtyType.BATCH_LOAD_ERROR,
+                            null, database, table, schema);
+
+            if (executor != null) {
+                executor.setDirtyMetaData(dirtyLabel, dirtyLogTag, dirtyIdentifier);
+            } else {
+                LOG.error("null executor");
+            }
+        } catch (Exception e) {
+            LOG.error("filling dirty metadata failed" + e);

Review Comment:
   ```suggestion
               LOG.error("filling dirty metadata failed: ", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1134912466


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -277,7 +294,7 @@ public final synchronized void writeRecord(In record) {
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);

Review Comment:
   yes, TableMetricStatementExecutor will calculate metrics if it is initialized.



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -277,7 +294,7 @@ public final synchronized void writeRecord(In record) {
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);

Review Comment:
   yes, TableMetricStatementExecutor will calculate metrics if it is initialized successfully



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139798744


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -365,6 +381,41 @@ public synchronized void flush() throws IOException {
         }
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySink == null) {
+            return null;
+        }
+        final DirtySinkHelper dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field ExecutorType;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            ExecutorType = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            ExecutorType = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " + exec.getClass());
+        }
+        ExecutorType.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) ExecutorType.get(exec);
+        Field statementFactory = TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, dirtySinkHelper, sinkMetricData);
+        if (exec instanceof TableBufferedStatementExecutor) {
+            Field transform = TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            transform.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, RowData>) transform.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        ExecutorType.set(exec, newExecutor);

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139929658


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -184,7 +191,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
                                     if (!closed) {
                                         try {
                                             flush();
-                                            if (sinkMetricData != null) {
+                                            if (sinkMetricData != null && dirtySink == null) {

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "yunqingmoswu (via GitHub)" <gi...@apache.org>.
yunqingmoswu commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1136622431


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -184,7 +191,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
                                     if (!closed) {
                                         try {
                                             flush();
-                                            if (sinkMetricData != null) {
+                                            if (sinkMetricData != null && dirtySink == null) {

Review Comment:
   Why metric data report dependent on 'dirtySink == null'?



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -365,6 +381,41 @@ public synchronized void flush() throws IOException {
         }
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySink == null) {
+            return null;
+        }
+        final DirtySinkHelper dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field ExecutorType;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            ExecutorType = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            ExecutorType = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " + exec.getClass());
+        }
+        ExecutorType.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) ExecutorType.get(exec);
+        Field statementFactory = TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, dirtySinkHelper, sinkMetricData);
+        if (exec instanceof TableBufferedStatementExecutor) {
+            Field transform = TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            transform.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, RowData>) transform.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        ExecutorType.set(exec, newExecutor);

Review Comment:
   Please add some description for it



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -302,6 +323,41 @@ private JdbcExec getOrCreateStatementExecutor(
         return jdbcExec;
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySinkHelper.getDirtySink() == null) {
+            return null;
+        }
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field subExecutor;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            subExecutor = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " + exec.getClass());
+        }
+        subExecutor.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) subExecutor.get(exec);
+        Field statementFactory = TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, dirtySinkHelper, sinkMetricData);
+        newExecutor.setMultipleSink(true);
+        if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            subExecutor.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, RowData>) subExecutor.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        subExecutor.set(exec, newExecutor);

Review Comment:
   Please add description for it



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -332,29 +388,35 @@ private void checkFlushException() {
     @Override
     public final synchronized void writeRecord(In row) throws IOException {
         checkFlushException();
-        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
-                (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+
         if (row instanceof RowData) {
             RowData rowData = (RowData) row;
             JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
-            String tableIdentifier = null;
+            String tableIdentifier;
+            String database;
+            String table;
+            String schema = null;
             try {
                 if (StringUtils.isBlank(schemaPattern)) {
-                    tableIdentifier = StringUtils.join(
-                            jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",
-                            jsonDynamicSchemaFormat.parse(rootNode, tablePattern));
+                    database = jsonDynamicSchemaFormat.parse(rootNode, databasePattern);
+                    table = jsonDynamicSchemaFormat.parse(rootNode, tablePattern);
+                    tableIdentifier = StringUtils.join(database, ".", table);
                 } else {
+                    database = jsonDynamicSchemaFormat.parse(rootNode, databasePattern);
+                    table = jsonDynamicSchemaFormat.parse(rootNode, tablePattern);
+                    schema = jsonDynamicSchemaFormat.parse(rootNode, schemaPattern);
                     tableIdentifier = StringUtils.join(
-                            jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",

Review Comment:
   What is the better for it ?



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -365,6 +381,41 @@ public synchronized void flush() throws IOException {
         }
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySink == null) {
+            return null;
+        }
+        final DirtySinkHelper dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field ExecutorType;

Review Comment:
   ExecutorType -> executorType



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "healchow (via GitHub)" <gi...@apache.org>.
healchow commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1161170793


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -392,6 +456,58 @@ public final synchronized void writeRecord(In row) throws IOException {
         }
     }
 
+    private void fillDirtyData(JdbcExec exec, String tableIdentifier) {
+        String[] identifiers = tableIdentifier.split("\\.");
+        String database;
+        String table;
+        String schema = null;
+        if (identifiers.length == 2) {
+            database = identifiers[0];
+            table = identifiers[1];
+        } else {
+            database = identifiers[0];
+            schema = identifiers[1];
+            table = identifiers[2];
+        }
+        TableMetricStatementExecutor executor = null;
+        try {
+            Field subExecutor;
+            if (exec instanceof TableMetricStatementExecutor) {
+                executor = (TableMetricStatementExecutor) exec;
+            } else if (exec instanceof TableBufferReducedStatementExecutor) {
+                subExecutor = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+                subExecutor.setAccessible(true);
+                executor = (TableMetricStatementExecutor) subExecutor.get(exec);
+            } else if (exec instanceof TableBufferedStatementExecutor) {
+                subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+                subExecutor.setAccessible(true);
+                executor = (TableMetricStatementExecutor) subExecutor.get(exec);
+            }
+        } catch (Exception e) {
+            LOG.error("parse executor failed" + e);
+        }
+
+        try {
+            DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+            String dirtyLabel = DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), DirtyType.BATCH_LOAD_ERROR, null,
+                    database, table, schema);
+            String dirtyLogTag =
+                    DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), DirtyType.BATCH_LOAD_ERROR, null,
+                            database, table, schema);
+            String dirtyIdentifier =
+                    DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), DirtyType.BATCH_LOAD_ERROR,
+                            null, database, table, schema);
+
+            if (executor != null) {
+                executor.setDirtyMetaData(dirtyLabel, dirtyLogTag, dirtyIdentifier);
+            } else {
+                LOG.error("null executor");

Review Comment:
   Please add more info for this `null executor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1161640685


##########
inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java:
##########
@@ -21,19 +21,16 @@
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-
 @Slf4j
 public class RegexReplaceTest {
 
     @Test
-    public void testRegexReplacement() throws IOException {
-        String[] identifier = new String[2];
-        identifier[0] = "yizhouyang";
-        identifier[1] = "table2";
-        String pattern = "${database}-${table}-${DIRTY_MESSAGE}";
-        String answer = DirtySinkHelper.regexReplace(pattern, DirtyType.BATCH_LOAD_ERROR, "mock message", identifier[0],
-                identifier[1], null);
-        Assert.assertEquals("yizhouyang-table2-mock message", answer);
+    public void testRegexReplacement() {
+        String database = "yizhouyang";

Review Comment:
   It's better to use another name for the test.



##########
inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java:
##########
@@ -21,19 +21,16 @@
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-
 @Slf4j
 public class RegexReplaceTest {
 
     @Test
-    public void testRegexReplacement() throws IOException {
-        String[] identifier = new String[2];
-        identifier[0] = "yizhouyang";
-        identifier[1] = "table2";
-        String pattern = "${database}-${table}-${DIRTY_MESSAGE}";
-        String answer = DirtySinkHelper.regexReplace(pattern, DirtyType.BATCH_LOAD_ERROR, "mock message", identifier[0],
-                identifier[1], null);
-        Assert.assertEquals("yizhouyang-table2-mock message", answer);
+    public void testRegexReplacement() {
+        String database = "yizhouyang";
+        String table = "table2";
+        String pattern = "${source.table}-${source.database}-${DIRTY_MESSAGE}";
+        String answer = DirtySinkHelper.regexReplace(pattern, DirtyType.BATCH_LOAD_ERROR, "mock message", database,
+                table, null);
+        Assert.assertEquals("table2-yizhouyang-mock message", answer);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139798463


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -332,29 +388,35 @@ private void checkFlushException() {
     @Override
     public final synchronized void writeRecord(In row) throws IOException {
         checkFlushException();
-        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
-                (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+
         if (row instanceof RowData) {
             RowData rowData = (RowData) row;
             JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
-            String tableIdentifier = null;
+            String tableIdentifier;
+            String database;
+            String table;
+            String schema = null;
             try {
                 if (StringUtils.isBlank(schemaPattern)) {
-                    tableIdentifier = StringUtils.join(
-                            jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",
-                            jsonDynamicSchemaFormat.parse(rootNode, tablePattern));
+                    database = jsonDynamicSchemaFormat.parse(rootNode, databasePattern);
+                    table = jsonDynamicSchemaFormat.parse(rootNode, tablePattern);
+                    tableIdentifier = StringUtils.join(database, ".", table);
                 } else {
+                    database = jsonDynamicSchemaFormat.parse(rootNode, databasePattern);
+                    table = jsonDynamicSchemaFormat.parse(rootNode, tablePattern);
+                    schema = jsonDynamicSchemaFormat.parse(rootNode, schemaPattern);
                     tableIdentifier = StringUtils.join(
-                            jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",

Review Comment:
   restored, there is no difference



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -302,6 +323,41 @@ private JdbcExec getOrCreateStatementExecutor(
         return jdbcExec;
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySinkHelper.getDirtySink() == null) {
+            return null;
+        }
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field subExecutor;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            subExecutor = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " + exec.getClass());
+        }
+        subExecutor.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) subExecutor.get(exec);
+        Field statementFactory = TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, dirtySinkHelper, sinkMetricData);
+        newExecutor.setMultipleSink(true);
+        if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            subExecutor.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, RowData>) subExecutor.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        subExecutor.set(exec, newExecutor);

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "yunqingmoswu (via GitHub)" <gi...@apache.org>.
yunqingmoswu commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139860636


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -184,7 +191,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
                                     if (!closed) {
                                         try {
                                             flush();
-                                            if (sinkMetricData != null) {
+                                            if (sinkMetricData != null && dirtySink == null) {

Review Comment:
   Please add some description for the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "healchow (via GitHub)" <gi...@apache.org>.
healchow commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1161170819


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -547,7 +676,18 @@ protected void attemptFlush() throws IOException {
                             jdbcStatementExecutor.executeBatch();
                             Long totalDataSize =
                                     Long.valueOf(record.toString().getBytes(StandardCharsets.UTF_8).length);
-                            outputMetrics(tableIdentifier, 1L, totalDataSize, false);
+                            if (dirtySinkHelper.getDirtySink() == null) {
+                                outputMetrics(tableIdentifier, (long) tableIdRecordList.size(),
+                                        totalDataSize, false);
+                            } else {
+                                try {
+                                    outputMetrics(tableIdentifier);
+                                } catch (Exception e) {
+                                    LOG.error("JDBC table metric calculation exception" + e);

Review Comment:
   ```suggestion
                                       LOG.error("JDBC table metric calculation exception: ", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139799635


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.connector.jdbc.statement.StatementFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link JdbcBatchStatementExecutor} that simply adds the records into batches of {@link
+ * java.sql.PreparedStatement} and doesn't buffer records in memory. Only used in Table/SQL API.
+ * Supported executors:TableBufferedStatementExecutor, TableBufferReducedStatementExecutor, TableSimpleStatementExecutor
+ */
+public final class TableMetricStatementExecutor implements JdbcBatchStatementExecutor<RowData> {
+
+    private static final Pattern pattern = Pattern.compile("Batch entry (\\d+)");
+    private static final Logger LOG = LoggerFactory.getLogger(TableMetricStatementExecutor.class);
+    private final StatementFactory stmtFactory;
+    private final JdbcRowConverter converter;
+    private List<RowData> batch;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
+    private final SinkMetricData sinkMetricData;
+    private final AtomicInteger counter = new AtomicInteger();
+    private transient FieldNamedPreparedStatement st;
+    private boolean multipleSink;
+    private String label;
+    private String logtag;
+    private String identifier;
+    private Function<RowData, RowData> valueTransform = null;
+    // counters used for table level metric calculation for multiple sink
+    public long[] metric = new long[4];
+
+    public TableMetricStatementExecutor(StatementFactory stmtFactory, JdbcRowConverter converter,
+            DirtySinkHelper<Object> dirtySinkHelper, SinkMetricData sinkMetricData) {
+        this.stmtFactory = checkNotNull(stmtFactory);
+        this.converter = checkNotNull(converter);
+        this.batch = new CopyOnWriteArrayList<>();
+        this.dirtySinkHelper = dirtySinkHelper;
+        this.sinkMetricData = sinkMetricData;
+    }
+
+    public void setDirtyMetaData(String label, String logtag, String identifier) {
+        this.label = label;
+        this.logtag = logtag;
+        this.identifier = identifier;
+    }
+
+    public void setMultipleSink(boolean multipleSink) {
+        this.multipleSink = multipleSink;
+    }
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        st = stmtFactory.createStatement(connection);
+    }
+
+    public void setValueTransform(Function<RowData, RowData> valueTransform) {
+        this.valueTransform = valueTransform;
+    }
+
+    @Override
+    public void addToBatch(RowData record) throws SQLException {
+        if (multipleSink && record.getRowKind().equals(RowKind.UPDATE_BEFORE)) {

Review Comment:
   used to fix the metric for update_before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "yunqingmoswu (via GitHub)" <gi...@apache.org>.
yunqingmoswu commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139862299


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -365,6 +381,41 @@ public synchronized void flush() throws IOException {
         }
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySink == null) {
+            return null;
+        }
+        final DirtySinkHelper dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field ExecutorType;

Review Comment:
   我的建议是遵循通用的java命名规范,当然不遵循也ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139914092


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -365,6 +381,41 @@ public synchronized void flush() throws IOException {
         }
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySink == null) {
+            return null;
+        }
+        final DirtySinkHelper dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field ExecutorType;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            ExecutorType = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            ExecutorType = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " + exec.getClass());
+        }
+        ExecutorType.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) ExecutorType.get(exec);
+        Field statementFactory = TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, dirtySinkHelper, sinkMetricData);
+        if (exec instanceof TableBufferedStatementExecutor) {
+            Field transform = TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            transform.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, RowData>) transform.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        ExecutorType.set(exec, newExecutor);

Review Comment:
   refresh the code to see the latest commit, both a javadoc and some inline comments, adding more now



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -302,6 +323,41 @@ private JdbcExec getOrCreateStatementExecutor(
         return jdbcExec;
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySinkHelper.getDirtySink() == null) {
+            return null;
+        }
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field subExecutor;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            subExecutor = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " + exec.getClass());
+        }
+        subExecutor.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) subExecutor.get(exec);
+        Field statementFactory = TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, dirtySinkHelper, sinkMetricData);
+        newExecutor.setMultipleSink(true);
+        if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            subExecutor.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, RowData>) subExecutor.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        subExecutor.set(exec, newExecutor);

Review Comment:
   refresh the code to see the latest commit, both a javadoc and some inline comments, adding more now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1147056883


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.connector.jdbc.statement.StatementFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link JdbcBatchStatementExecutor} that simply adds the records into batches of {@link
+ * java.sql.PreparedStatement} and doesn't buffer records in memory. Only used in Table/SQL API.
+ * Supported executors:TableBufferedStatementExecutor, TableBufferReducedStatementExecutor, TableSimpleStatementExecutor
+ */
+public final class TableMetricStatementExecutor implements JdbcBatchStatementExecutor<RowData> {
+
+    private static final Pattern pattern = Pattern.compile("Batch entry (\\d+)");
+    private static final Logger LOG = LoggerFactory.getLogger(TableMetricStatementExecutor.class);
+    private final StatementFactory stmtFactory;
+    private final JdbcRowConverter converter;
+    private List<RowData> batch;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
+    private final SinkMetricData sinkMetricData;
+    private final AtomicInteger counter = new AtomicInteger();
+    private transient FieldNamedPreparedStatement st;
+    private boolean multipleSink;
+    private String label;
+    private String logtag;
+    private String identifier;
+    private Function<RowData, RowData> valueTransform = null;
+    // counters used for table level metric calculation for multiple sink
+    public long[] metric = new long[4];
+
+    public TableMetricStatementExecutor(StatementFactory stmtFactory, JdbcRowConverter converter,
+            DirtySinkHelper<Object> dirtySinkHelper, SinkMetricData sinkMetricData) {
+        this.stmtFactory = checkNotNull(stmtFactory);
+        this.converter = checkNotNull(converter);
+        this.batch = new CopyOnWriteArrayList<>();
+        this.dirtySinkHelper = dirtySinkHelper;
+        this.sinkMetricData = sinkMetricData;
+    }
+
+    public void setDirtyMetaData(String label, String logtag, String identifier) {
+        this.label = label;
+        this.logtag = logtag;
+        this.identifier = identifier;
+    }
+
+    public void setMultipleSink(boolean multipleSink) {
+        this.multipleSink = multipleSink;
+    }
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        st = stmtFactory.createStatement(connection);
+    }
+
+    public void setValueTransform(Function<RowData, RowData> valueTransform) {
+        this.valueTransform = valueTransform;
+    }
+
+    @Override
+    public void addToBatch(RowData record) throws SQLException {
+        if (multipleSink && record.getRowKind().equals(RowKind.UPDATE_BEFORE)) {

Review Comment:
   rowkind all is INSERT when multipleSink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1136576749


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -277,7 +294,7 @@ public final synchronized void writeRecord(In record) {
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);

Review Comment:
   > yes, TableMetricStatementExecutor will calculate metrics if it is initialized successfully
   
   @Yizhou-Yang Is it normal when delete and update data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1133607722


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.connector.jdbc.statement.StatementFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link JdbcBatchStatementExecutor} that simply adds the records into batches of {@link
+ * java.sql.PreparedStatement} and doesn't buffer records in memory. Only used in Table/SQL API.
+ */
+public final class TableMetricStatementExecutor implements JdbcBatchStatementExecutor<RowData> {
+
+    private static final Pattern pattern = Pattern.compile("Batch entry (\\d+)");
+    private static final Logger LOG = LoggerFactory.getLogger(TableMetricStatementExecutor.class);
+    private final StatementFactory stmtFactory;
+    private final JdbcRowConverter converter;
+    private List<RowData> batch;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
+    private final SinkMetricData sinkMetricData;
+    private final AtomicInteger counter = new AtomicInteger();
+    private transient FieldNamedPreparedStatement st;
+    private boolean multipleSink;
+    private String label;
+    private String logtag;
+    private String identifier;
+    private Function<RowData, RowData> valueTransform = null;
+    // counters used for table level metric calculation for multiple sink
+    public long[] metric = new long[4];
+
+    public TableMetricStatementExecutor(StatementFactory stmtFactory, JdbcRowConverter converter,
+            DirtySinkHelper<Object> dirtySinkHelper, SinkMetricData sinkMetricData) {
+        this.stmtFactory = checkNotNull(stmtFactory);
+        this.converter = checkNotNull(converter);
+        this.batch = new CopyOnWriteArrayList<>();
+        this.dirtySinkHelper = dirtySinkHelper;
+        this.sinkMetricData = sinkMetricData;
+    }
+
+    public void setDirtyMetaData(String label, String logtag, String identifier) {
+        this.label = label;
+        this.logtag = logtag;
+        this.identifier = identifier;
+    }
+
+    public void setMultipleSink(boolean multipleSink) {
+        this.multipleSink = multipleSink;
+    }
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        st = stmtFactory.createStatement(connection);
+    }
+
+    public void setValueTransform(Function<RowData, RowData> valueTransform) {
+        this.valueTransform = valueTransform;
+    }
+
+    @Override
+    public void addToBatch(RowData record) throws SQLException {
+        if (valueTransform != null) {
+            record = valueTransform.apply(record); // copy or not
+        }
+        batch.add(record);
+        converter.toExternal(record, st);
+        st.addBatch();
+    }
+
+    @Override
+    public void executeBatch() throws SQLException {
+        try {
+            st.executeBatch();
+
+            long writtenSize = batch.size();
+            // approximate since it may be inefficient to iterate over all writtenSize-1 elements.
+            long writtenBytes = 0L;
+            if (writtenSize > 0) {
+                writtenBytes = (long) batch.get(0).toString().getBytes(StandardCharsets.UTF_8).length * writtenSize;
+            }
+            batch.clear();
+            if (!multipleSink) {
+                sinkMetricData.invoke(writtenSize, writtenBytes);
+            } else {
+                metric[0] += writtenSize;
+                metric[1] += writtenBytes;
+            }
+
+        } catch (SQLException e) {
+            // clear the prepared statement first to avoid exceptions
+            st.clearParameters();
+            try {
+                processErrorPosition(e);
+            } catch (Exception ex) {
+                try {
+                    retryEntireBatch();
+                } catch (JsonProcessingException exc) {
+                    LOG.error("dirty data archive failed");
+                }
+            }
+        }
+    }
+
+    private void processErrorPosition(SQLException e) throws SQLException {
+        List<Integer> errorPositions = parseError(e);
+        // the data before the first sqlexception are already written, handle those and remove them.
+        int writtenSize = errorPositions.get(0);
+        long writtenBytes = 0L;
+        if (writtenSize > 0) {
+            writtenBytes = (long) batch.get(0).toString().getBytes(StandardCharsets.UTF_8).length * writtenSize;
+        }
+        if (!multipleSink) {
+            sinkMetricData.invoke(writtenSize, writtenBytes);
+        } else {
+            metric[0] += writtenSize;
+            metric[1] += writtenBytes;
+        }
+
+        batch = batch.subList(writtenSize, batch.size());
+
+        // for the unwritten data, remove the dirty ones
+        for (int pos : errorPositions) {
+            pos -= writtenSize;
+            RowData record = batch.get(pos);
+            batch.remove(record);
+            invokeDirty(record, e);
+        }
+
+        // try to execute the supposedly clean batch, throw exception on failure
+        for (RowData record : batch) {
+            addToBatch(record);
+        }
+        st.executeBatch();
+        batch.clear();
+        st.clearParameters();
+    }
+
+    private void retryEntireBatch() throws SQLException, JsonProcessingException {
+        // clear parameters to make sure the batch is always clean in the end.
+        st.clearParameters();
+        for (RowData rowData : batch) {
+            try {
+                converter.toExternal(rowData, st);
+                st.addBatch();
+                st.executeBatch();
+                if (!multipleSink) {
+                    sinkMetricData.invoke(1, rowData.toString().getBytes().length);
+                } else {
+                    metric[0] += 1;
+                    metric[1] += rowData.toString().getBytes().length;
+                }
+            } catch (Exception e) {
+                st.clearParameters();
+                invokeDirty(rowData, e);
+            }
+        }
+        batch.clear();
+        st.clearParameters();
+    }
+
+    private void invokeDirty(RowData rowData, Exception e) {
+        if (!multipleSink) {
+            dirtySinkHelper.invoke(rowData.toString(), DirtyType.BATCH_LOAD_ERROR, e);
+            sinkMetricData.invokeDirty(1, rowData.toString().getBytes().length);
+        } else {
+            dirtySinkHelper.invoke(rowData.toString(), DirtyType.BATCH_LOAD_ERROR, label, logtag, identifier, e);

Review Comment:
   Maybe `dirtySinkHelper` need judge not null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1134833759


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -277,7 +294,7 @@ public final synchronized void writeRecord(In record) {
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);

Review Comment:
   > 
   
   @Yizhou-Yang It will exec this logic when I don't use dirtySink. Other logic report metric when I use dirtySink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139912829


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -365,6 +381,41 @@ public synchronized void flush() throws IOException {
         }
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySink == null) {
+            return null;
+        }
+        final DirtySinkHelper dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field ExecutorType;

Review Comment:
   refresh the code to see the latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #7580: [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation

Posted by "yunqingmoswu (via GitHub)" <gi...@apache.org>.
yunqingmoswu commented on code in PR #7580:
URL: https://github.com/apache/inlong/pull/7580#discussion_r1139860862


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -365,6 +381,41 @@ public synchronized void flush() throws IOException {
         }
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySink == null) {
+            return null;
+        }
+        final DirtySinkHelper dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field ExecutorType;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            ExecutorType = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            ExecutorType = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " + exec.getClass());
+        }
+        ExecutorType.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) ExecutorType.get(exec);
+        Field statementFactory = TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, dirtySinkHelper, sinkMetricData);
+        if (exec instanceof TableBufferedStatementExecutor) {
+            Field transform = TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            transform.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, RowData>) transform.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        ExecutorType.set(exec, newExecutor);

Review Comment:
   where?Please add some description for the code



##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -302,6 +323,41 @@ private JdbcExec getOrCreateStatementExecutor(
         return jdbcExec;
     }
 
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws NoSuchFieldException, IllegalAccessException {
+        if (dirtySinkHelper.getDirtySink() == null) {
+            return null;
+        }
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field subExecutor;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            subExecutor = TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " + exec.getClass());
+        }
+        subExecutor.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) subExecutor.get(exec);
+        Field statementFactory = TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, dirtySinkHelper, sinkMetricData);
+        newExecutor.setMultipleSink(true);
+        if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            subExecutor.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, RowData>) subExecutor.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        subExecutor.set(exec, newExecutor);

Review Comment:
   Please add some description for the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org