You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/12 13:55:36 UTC

[GitHub] [pulsar] nicoloboschi opened a new pull request, #18017: [improve][io] JDBC sinks: implement JDBC Batch API

nicoloboschi opened a new pull request, #18017:
URL: https://github.com/apache/pulsar/pull/18017

   ### Motivation
   Usually in high load systems the JDBC drivers are meant to used with the JDBC Batch API. Currently it's not implemented in the jdbc sink
   
   ### Modifications
   
   * New flag `useJdbcBatch` to turn on this feature (default to false)
   * If useJdbcBatch is on, then the sink will use ´addBatch()´ and `executeBatch()` methods
   * In case in the same messages batch (defined by batchSize or timeoutMs) there are different kind of operations, the batches are split in different batches operations but still in the same transaction. This means that if you use transactions the behaviour is semantically the same. If transactions are disabled (autocommit=true) the behaviour could be a little different since more statements could be executed than without this option. For example, if in the same batch you have two messages and first statement fail: A) no jdbc batch -> the first fails, the second fails. B) with jdbc batch -> the first goes in, the second fails. this is due to the JDBC batch nature. However it's recommended to turn on transacions if the driver has support for them.
   * Added more specific tests to verify error scenario behaviours and ensure that commits and msg ack are executed correctly for each message.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [x] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r996705078


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -76,15 +76,24 @@ public class JdbcSinkConfig implements Serializable {
     @FieldDoc(
         required = false,
         defaultValue = "500",
-        help = "The jdbc operation timeout in milliseconds"
+        help = "Enable batch mode by time. After timeoutMs milliseconds the operations queue will be flushed."

Review Comment:
   what about introducing a new "batchTimeoutMs" ?
   
   the timeoutMs parameter already existed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r998005273


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -280,21 +318,56 @@ private void flush() {
                 }
             }
 
-            if (swapList.size() != count) {
-                log.error("Update count {} not match total number of records {}", count, swapList.size());
-            }
-
-            // finish flush
-            if (log.isDebugEnabled()) {
-                log.debug("Finish flush, queue size: {}", swapList.size());
-            }
-            swapList.clear();
             isFlushing.set(false);
+            if (needAnotherRound) {
+                flush();
+            }

Review Comment:
   it makes sense, I added a test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#issuecomment-1282526416

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/18017?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#18017](https://codecov.io/gh/apache/pulsar/pull/18017?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (9a9e72b) into [master](https://codecov.io/gh/apache/pulsar/commit/6c65ca0d8a80bfaaa4d5869e0cea485f5c94369b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6c65ca0) will **increase** coverage by `10.63%`.
   > The diff coverage is `76.19%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/18017/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/18017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #18017       +/-   ##
   =============================================
   + Coverage     34.91%   45.54%   +10.63%     
   - Complexity     5707     6150      +443     
   =============================================
     Files           607      367      -240     
     Lines         53396    41174    -12222     
     Branches       5712     4219     -1493     
   =============================================
   + Hits          18644    18754      +110     
   + Misses        32119    20243    -11876     
   + Partials       2633     2177      -456     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `45.54% <76.19%> (+10.63%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/18017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../service/SystemTopicBasedTopicPoliciesService.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1N5c3RlbVRvcGljQmFzZWRUb3BpY1BvbGljaWVzU2VydmljZS5qYXZh) | `59.49% <0.00%> (+7.90%)` | :arrow_up: |
   | [.../pulsar/broker/stats/BrokerOperabilityMetrics.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zdGF0cy9Ccm9rZXJPcGVyYWJpbGl0eU1ldHJpY3MuamF2YQ==) | `94.64% <ø> (+1.99%)` | :arrow_up: |
   | [...g/apache/pulsar/compaction/CompactedTopicImpl.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbXBhY3Rpb24vQ29tcGFjdGVkVG9waWNJbXBsLmphdmE=) | `69.28% <0.00%> (+58.57%)` | :arrow_up: |
   | [.../org/apache/pulsar/broker/admin/v2/Namespaces.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi92Mi9OYW1lc3BhY2VzLmphdmE=) | `54.41% <50.00%> (+46.38%)` | :arrow_up: |
   | [...broker/delayed/InMemoryDelayedDeliveryTracker.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9kZWxheWVkL0luTWVtb3J5RGVsYXllZERlbGl2ZXJ5VHJhY2tlci5qYXZh) | `65.00% <75.00%> (+65.00%)` | :arrow_up: |
   | [...n/java/org/apache/pulsar/broker/PulsarService.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9QdWxzYXJTZXJ2aWNlLmphdmE=) | `56.38% <100.00%> (+2.06%)` | :arrow_up: |
   | [...pulsar/broker/admin/impl/PersistentTopicsBase.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi9pbXBsL1BlcnNpc3RlbnRUb3BpY3NCYXNlLmphdmE=) | `50.60% <100.00%> (+39.20%)` | :arrow_up: |
   | [...rg/apache/pulsar/broker/service/BrokerService.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Jyb2tlclNlcnZpY2UuamF2YQ==) | `55.74% <100.00%> (+7.74%)` | :arrow_up: |
   | [...ion/pendingack/impl/MLPendingAckStoreProvider.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9wZW5kaW5nYWNrL2ltcGwvTUxQZW5kaW5nQWNrU3RvcmVQcm92aWRlci5qYXZh) | `70.00% <100.00%> (+2.53%)` | :arrow_up: |
   | [.../pulsar/broker/service/SharedConsumerAssignor.java](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1NoYXJlZENvbnN1bWVyQXNzaWdub3IuamF2YQ==) | `3.70% <0.00%> (-64.82%)` | :arrow_down: |
   | ... and [398 more](https://codecov.io/gh/apache/pulsar/pull/18017/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r996707774


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -76,15 +76,24 @@ public class JdbcSinkConfig implements Serializable {
     @FieldDoc(
         required = false,
         defaultValue = "500",
-        help = "The jdbc operation timeout in milliseconds"
+        help = "Enable batch mode by time. After timeoutMs milliseconds the operations queue will be flushed."

Review Comment:
   `timeoutMs` has the same meaning as before. It doesn't have a meaningful name and that's why I updated the help.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r997545101


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -213,63 +224,90 @@ protected enum MutationType {
 
 
     private void flush() {
-        // if not in flushing state, do flush, else return;
         if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
-            if (log.isDebugEnabled()) {
-                log.debug("Starting flush, queue size: {}", incomingList.size());
-            }
-            if (!swapList.isEmpty()) {
-                throw new IllegalStateException("swapList should be empty since last flush. swapList.size: "
-                        + swapList.size());
-            }
-            synchronized (this) {
-                List<Record<T>> tmpList;
-                swapList.clear();
+            boolean needAnotherRound;
+            final Deque<Record<T>> swapList = new LinkedList<>();
+
+            synchronized (incomingList) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Starting flush, queue size: {}", incomingList.size());
+                }
+                final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) :
+                        incomingList.size();
 
-                tmpList = swapList;
-                swapList = incomingList;
-                incomingList = tmpList;
+                for (int i = 0; i < actualBatchSize; i++) {
+                    swapList.add(incomingList.removeFirst());
+                }
+                needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize;
             }
+            long start = System.nanoTime();
 
             int count = 0;
             try {
+                PreparedStatement currentBatch = null;
+                final List<Mutation> mutations = swapList
+                        .stream()
+                        .map(this::createMutation)
+                        .collect(Collectors.toList());
                 // bind each record value
-                for (Record<T> record : swapList) {
-                    final Mutation mutation = createMutation(record);
+                PreparedStatement statement;
+                for (Mutation mutation : mutations) {
                     switch (mutation.getType()) {
                         case DELETE:
-                            bindValue(deleteStatement, mutation);
-                            count += 1;
-                            deleteStatement.execute();
+                            statement = deleteStatement;
                             break;
                         case UPDATE:
-                            bindValue(updateStatement, mutation);
-                            count += 1;
-                            updateStatement.execute();
+                            statement = updateStatement;
                             break;
                         case INSERT:
-                            bindValue(insertStatement, mutation);
-                            count += 1;
-                            insertStatement.execute();
+                            statement = insertStatement;
                             break;
                         case UPSERT:
-                            bindValue(upsertStatement, mutation);
-                            count += 1;
-                            upsertStatement.execute();
+                            statement = upsertStatement;
                             break;
                         default:
                             String msg = String.format(
                                     "Unsupported action %s, can be one of %s, or not set which indicate %s",
                                     mutation.getType(), Arrays.toString(MutationType.values()), MutationType.INSERT);
                             throw new IllegalArgumentException(msg);
                     }
+                    bindValue(statement, mutation);
+                    count += 1;
+                    if (jdbcSinkConfig.isUseJdbcBatch()) {
+                        if (currentBatch != null && statement != currentBatch) {
+                            executeBatch(swapList, currentBatch);
+                            if (log.isDebugEnabled()) {
+                                log.debug("Flushed {} messages in {} ms", count, (System.nanoTime() - start) / 1000 / 1000);
+                            }
+                            start = System.nanoTime();
+                        }
+                        statement.addBatch();
+                        currentBatch = statement;
+                    } else {
+                        statement.execute();
+                        if (!jdbcSinkConfig.isUseTransactions()) {
+                            swapList.removeFirst().ack();
+                        }
+                    }
                 }
-                if (jdbcSinkConfig.isUseTransactions()) {
-                    connection.commit();
+
+                if (jdbcSinkConfig.isUseJdbcBatch()) {
+                    executeBatch(swapList, currentBatch);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Flushed {} messages in {} ms", count, (System.nanoTime() - start) / 1000 / 1000);
+                    }
+                } else {
+                    if (jdbcSinkConfig.isUseTransactions()) {
+                        connection.commit();
+                        swapList.forEach(Record::ack);
+                    }

Review Comment:
   This looks like transactions and batches are mutually exclusive.
   transactions and batching are all interwoven here, I'd do two helper methods like "internalFlushBatch" and "internalFlush" so you can do something like
   ```java
   if (needToBatch) {
     internalFlushBatch();
   else {
     internalFlush();
   }
   ```



##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -280,21 +318,56 @@ private void flush() {
                 }
             }
 
-            if (swapList.size() != count) {
-                log.error("Update count {} not match total number of records {}", count, swapList.size());
-            }
-
-            // finish flush
-            if (log.isDebugEnabled()) {
-                log.debug("Finish flush, queue size: {}", swapList.size());
-            }
-            swapList.clear();
             isFlushing.set(false);
+            if (needAnotherRound) {
+                flush();
+            }

Review Comment:
   I think this is not covered by tests (have I missed it?)



##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -76,15 +76,24 @@ public class JdbcSinkConfig implements Serializable {
     @FieldDoc(
         required = false,
         defaultValue = "500",
-        help = "The jdbc operation timeout in milliseconds"
+        help = "Enable batch mode by time. After timeoutMs milliseconds the operations queue will be flushed."
     )
     private int timeoutMs = 500;
     @FieldDoc(
         required = false,
         defaultValue = "200",
-        help = "The batch size of updates made to the database"
+        help = "Enable batch mode by number of operations. This value is the max number of operations "
+                + "batched in the same transaction/batch."
     )
     private int batchSize = 200;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "false",
+            help = "Use the JDBC batch API. This option is suggested to improve writes performances."

Review Comment:
   ```suggestion
               help = "Use the JDBC batch API. This option is suggested to improve write performance."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r993522741


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -213,63 +223,90 @@ protected enum MutationType {
 
 
     private void flush() {
-        // if not in flushing state, do flush, else return;
         if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
-            if (log.isDebugEnabled()) {
-                log.debug("Starting flush, queue size: {}", incomingList.size());
-            }
-            if (!swapList.isEmpty()) {
-                throw new IllegalStateException("swapList should be empty since last flush. swapList.size: "
-                        + swapList.size());
-            }
-            synchronized (this) {
-                List<Record<T>> tmpList;
-                swapList.clear();
+            boolean needAnotherRound;
+            final Deque<Record<T>> swapList = new LinkedList<>();
+
+            synchronized (incomingList) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Starting flush, queue size: {}", incomingList.size());
+                }
+                final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) :
+                        incomingList.size();
 
-                tmpList = swapList;
-                swapList = incomingList;
-                incomingList = tmpList;
+                for (int i = 0; i < actualBatchSize; i++) {
+                    swapList.add(incomingList.removeFirst());
+                }
+                needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize;
             }
+            long start = System.nanoTime();
 
             int count = 0;
             try {
+                PreparedStatement currentBatch = null;
+                final List<Mutation> mutations = swapList

Review Comment:
   why do you need this intermediate list ?



##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -213,63 +223,90 @@ protected enum MutationType {
 
 
     private void flush() {
-        // if not in flushing state, do flush, else return;
         if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
-            if (log.isDebugEnabled()) {
-                log.debug("Starting flush, queue size: {}", incomingList.size());
-            }
-            if (!swapList.isEmpty()) {
-                throw new IllegalStateException("swapList should be empty since last flush. swapList.size: "
-                        + swapList.size());
-            }
-            synchronized (this) {
-                List<Record<T>> tmpList;
-                swapList.clear();
+            boolean needAnotherRound;
+            final Deque<Record<T>> swapList = new LinkedList<>();
+
+            synchronized (incomingList) {

Review Comment:
   if you use `synchronized`  on "incomingList" here you have to use it everywhere, otherwire you are not handling concurrently well



##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -280,21 +317,50 @@ private void flush() {
                 }
             }
 
-            if (swapList.size() != count) {
-                log.error("Update count {} not match total number of records {}", count, swapList.size());
-            }
-
-            // finish flush
-            if (log.isDebugEnabled()) {
-                log.debug("Finish flush, queue size: {}", swapList.size());
-            }
-            swapList.clear();
             isFlushing.set(false);
+            if (needAnotherRound) {
+                flush();
+            }
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
             }
         }
     }
 
+    private void executeBatch(Deque<Record<T>> swapList, PreparedStatement statement) throws SQLException {
+        final int[] results = statement.executeBatch();
+        Map<Integer, Integer> failuresMapping = null;
+        final boolean useTransactions = jdbcSinkConfig.isUseTransactions();
+
+        for (int r: results) {
+            if (r < 0) {

Review Comment:
   this is not correct, there are special negative values that have special meaning
   
   https://docs.oracle.com/javase/7/docs/api/java/sql/Statement.html#SUCCESS_NO_INFO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r993535315


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -213,63 +223,90 @@ protected enum MutationType {
 
 
     private void flush() {
-        // if not in flushing state, do flush, else return;
         if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
-            if (log.isDebugEnabled()) {
-                log.debug("Starting flush, queue size: {}", incomingList.size());
-            }
-            if (!swapList.isEmpty()) {
-                throw new IllegalStateException("swapList should be empty since last flush. swapList.size: "
-                        + swapList.size());
-            }
-            synchronized (this) {
-                List<Record<T>> tmpList;
-                swapList.clear();
+            boolean needAnotherRound;
+            final Deque<Record<T>> swapList = new LinkedList<>();
+
+            synchronized (incomingList) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Starting flush, queue size: {}", incomingList.size());
+                }
+                final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) :
+                        incomingList.size();
 
-                tmpList = swapList;
-                swapList = incomingList;
-                incomingList = tmpList;
+                for (int i = 0; i < actualBatchSize; i++) {
+                    swapList.add(incomingList.removeFirst());
+                }
+                needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize;
             }
+            long start = System.nanoTime();
 
             int count = 0;
             try {
+                PreparedStatement currentBatch = null;
+                final List<Mutation> mutations = swapList

Review Comment:
   because `swapList` is going to change (remove items) while looping over `mutations`.
   I think it's the best solution in terms of code readability. The mem footprint impact shouldn't be relevant. The list size is pretty small (usually the batchSize value or less) and there's no data redundancy between swapList and mutations  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi merged pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
nicoloboschi merged PR #18017:
URL: https://github.com/apache/pulsar/pull/18017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r997545101


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -213,63 +224,90 @@ protected enum MutationType {
 
 
     private void flush() {
-        // if not in flushing state, do flush, else return;
         if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
-            if (log.isDebugEnabled()) {
-                log.debug("Starting flush, queue size: {}", incomingList.size());
-            }
-            if (!swapList.isEmpty()) {
-                throw new IllegalStateException("swapList should be empty since last flush. swapList.size: "
-                        + swapList.size());
-            }
-            synchronized (this) {
-                List<Record<T>> tmpList;
-                swapList.clear();
+            boolean needAnotherRound;
+            final Deque<Record<T>> swapList = new LinkedList<>();
+
+            synchronized (incomingList) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Starting flush, queue size: {}", incomingList.size());
+                }
+                final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) :
+                        incomingList.size();
 
-                tmpList = swapList;
-                swapList = incomingList;
-                incomingList = tmpList;
+                for (int i = 0; i < actualBatchSize; i++) {
+                    swapList.add(incomingList.removeFirst());
+                }
+                needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize;
             }
+            long start = System.nanoTime();
 
             int count = 0;
             try {
+                PreparedStatement currentBatch = null;
+                final List<Mutation> mutations = swapList
+                        .stream()
+                        .map(this::createMutation)
+                        .collect(Collectors.toList());
                 // bind each record value
-                for (Record<T> record : swapList) {
-                    final Mutation mutation = createMutation(record);
+                PreparedStatement statement;
+                for (Mutation mutation : mutations) {
                     switch (mutation.getType()) {
                         case DELETE:
-                            bindValue(deleteStatement, mutation);
-                            count += 1;
-                            deleteStatement.execute();
+                            statement = deleteStatement;
                             break;
                         case UPDATE:
-                            bindValue(updateStatement, mutation);
-                            count += 1;
-                            updateStatement.execute();
+                            statement = updateStatement;
                             break;
                         case INSERT:
-                            bindValue(insertStatement, mutation);
-                            count += 1;
-                            insertStatement.execute();
+                            statement = insertStatement;
                             break;
                         case UPSERT:
-                            bindValue(upsertStatement, mutation);
-                            count += 1;
-                            upsertStatement.execute();
+                            statement = upsertStatement;
                             break;
                         default:
                             String msg = String.format(
                                     "Unsupported action %s, can be one of %s, or not set which indicate %s",
                                     mutation.getType(), Arrays.toString(MutationType.values()), MutationType.INSERT);
                             throw new IllegalArgumentException(msg);
                     }
+                    bindValue(statement, mutation);
+                    count += 1;
+                    if (jdbcSinkConfig.isUseJdbcBatch()) {
+                        if (currentBatch != null && statement != currentBatch) {
+                            executeBatch(swapList, currentBatch);
+                            if (log.isDebugEnabled()) {
+                                log.debug("Flushed {} messages in {} ms", count, (System.nanoTime() - start) / 1000 / 1000);
+                            }
+                            start = System.nanoTime();
+                        }
+                        statement.addBatch();
+                        currentBatch = statement;
+                    } else {
+                        statement.execute();
+                        if (!jdbcSinkConfig.isUseTransactions()) {
+                            swapList.removeFirst().ack();
+                        }
+                    }
                 }
-                if (jdbcSinkConfig.isUseTransactions()) {
-                    connection.commit();
+
+                if (jdbcSinkConfig.isUseJdbcBatch()) {
+                    executeBatch(swapList, currentBatch);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Flushed {} messages in {} ms", count, (System.nanoTime() - start) / 1000 / 1000);
+                    }
+                } else {
+                    if (jdbcSinkConfig.isUseTransactions()) {
+                        connection.commit();
+                        swapList.forEach(Record::ack);
+                    }

Review Comment:
   This looks like transactions and batches are mutually exclusive.
   transactions and batching are all interwoven here, I'd do two helper methods like "internalFlushBatch" and "internalFlush" so you can do something like
   ```java
   if (needToBatch) {
     internalFlushBatch();
   } else {
     internalFlush();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#issuecomment-1278675303

   @eolivelli please review it again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #18017: [improve][io] JDBC sinks: implement JDBC Batch API

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r993539399


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -280,21 +317,50 @@ private void flush() {
                 }
             }
 
-            if (swapList.size() != count) {
-                log.error("Update count {} not match total number of records {}", count, swapList.size());
-            }
-
-            // finish flush
-            if (log.isDebugEnabled()) {
-                log.debug("Finish flush, queue size: {}", swapList.size());
-            }
-            swapList.clear();
             isFlushing.set(false);
+            if (needAnotherRound) {
+                flush();
+            }
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
             }
         }
     }
 
+    private void executeBatch(Deque<Record<T>> swapList, PreparedStatement statement) throws SQLException {
+        final int[] results = statement.executeBatch();
+        Map<Integer, Integer> failuresMapping = null;
+        final boolean useTransactions = jdbcSinkConfig.isUseTransactions();
+
+        for (int r: results) {
+            if (r < 0) {

Review Comment:
   good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org