You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "sandynz (via GitHub)" <gi...@apache.org> on 2023/05/29 08:20:18 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #25932: Add openGauss/PostgreSQL dumper reconnect at pipeline

sandynz commented on code in PR #25932:
URL: https://github.com/apache/shardingsphere/pull/25932#discussion_r1209049257


##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -80,15 +85,22 @@ public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPo
         this.decodeWithTX = dumperConfig.isDecodeWithTX();
     }
     
-    @SneakyThrows(InterruptedException.class)
     @Override
     protected void runBlocking() {
+        while (reconnectTimes.get() <= 3) {
+            connect();
+        }

Review Comment:
   Looks it will always run `connect()` several times.
   
   Reconnect should just run on exception occur.



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -80,15 +85,22 @@ public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPo
         this.decodeWithTX = dumperConfig.isDecodeWithTX();
     }
     
-    @SneakyThrows(InterruptedException.class)
     @Override
     protected void runBlocking() {
+        while (reconnectTimes.get() <= 3) {
+            connect();
+        }
+    }
+    
+    @SneakyThrows(InterruptedException.class)
+    private void connect() {
         PGReplicationStream stream = null;
         try (PgConnection connection = getReplicationConnectionUnwrap()) {
             stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
             DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX);
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
+                reconnectTimes.set(0);

Review Comment:
   Why `reconnectTimes.set(0);` is invoked in while loop



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -101,7 +113,11 @@ protected void runBlocking() {
                 }
             }
         } catch (final SQLException ex) {
-            throw new IngestException(ex);
+            int reconnectTimes = this.reconnectTimes.incrementAndGet();
+            log.error("Connect failed, reconnect times={}", reconnectTimes, ex);
+            if (reconnectTimes > 3) {
+                throw new IngestException(ex);
+            }

Review Comment:
   Could we move exception handling to `runBlocking()` method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org