You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/07 08:05:09 UTC

[camel] branch camel-3.0.x updated: pg-replication-slot fix: if PG connection got lost while trying to send (#3389)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.0.x by this push:
     new 86fdfcb  pg-replication-slot fix: if PG connection got lost while trying to send (#3389)
86fdfcb is described below

commit 86fdfcb6912d81dfee6a09eaa8c1edbee02f3dce
Author: Bahaa Zaid <ba...@pixelogicmedia.com>
AuthorDate: Sat Dec 7 10:03:17 2019 +0200

    pg-replication-slot fix: if PG connection got lost while trying to send (#3389)
    
    the status to PG, the reconnect logic will never be called and the
    exchange will be reprocessed forever.
    
    Explanation:
    After we receive the payload from PostgreSQL, we keep it in memory so we
    can process it again and again in case the processing of the exchange
    fails. After the successful completion of the exchange, we (1) send the
    status to PostgreSQL, (2) reset the payload, and (3) we receive the next
    one.
    
    If for some reason we lose the connection while trying to do step 2, the
    payload will never get reset, and it will be processed forever. We need
    to reset the payload first so in case of failure in updating the status,
    the next poll will reconnect and receive the next payload. The next
    status update cover both payload as LSNs are sequential.
---
 .../component/pg/replication/slot/PgReplicationSlotConsumer.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
index 5073f97..4390ae4 100644
--- a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
+++ b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
@@ -157,6 +157,10 @@ public class PgReplicationSlotConsumer extends ScheduledPollConsumer {
 
     private void processCommit(Exchange exchange) {
         try {
+            // Reset the `payload` buffer first because it's already processed, and in case of losing the connection
+            // while updating the status, the next poll will try to reconnect again instead of processing the stale payload.
+            this.payload = null;
+
             PGReplicationStream stream = getStream();
 
             if (stream == null) {
@@ -166,8 +170,6 @@ public class PgReplicationSlotConsumer extends ScheduledPollConsumer {
             stream.setAppliedLSN(stream.getLastReceiveLSN());
             stream.setFlushedLSN(stream.getLastReceiveLSN());
             stream.forceUpdateStatus();
-
-            this.payload = null;
         } catch (SQLException e) {
             getExceptionHandler().handleException("Exception while sending feedback to PostgreSQL.", exchange, e);
         }