You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/11/16 12:38:10 UTC

[GitHub] [iotdb] Cpaulyz opened a new pull request, #8019: [IOTDB-4957] Add check for create pipesink and optimize re-connection

Cpaulyz opened a new pull request, #8019:
URL: https://github.com/apache/iotdb/pull/8019

   ## Description
   
   https://issues.apache.org/jira/browse/IOTDB-4957
   
   1. Add check for create pipesink. For IoTDBPipeSink, it will response exception if ip address is not standard IPv4 address.
   2. Optimize re-connection. If sender lost connection to receiver, it will hang up util scheduled hearbeat task successfully reconnect to receiver. Default interval of heartbeat is 5 seconds. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] HTHou merged pull request #8019: [IOTDB-4957] Add check for create pipesink and optimize re-connection

Posted by GitBox <gi...@apache.org>.
HTHou merged PR #8019:
URL: https://github.com/apache/iotdb/pull/8019


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] HTHou commented on a diff in pull request #8019: [IOTDB-4957] Add check for create pipesink and optimize re-connection

Posted by GitBox <gi...@apache.org>.
HTHou commented on code in PR #8019:
URL: https://github.com/apache/iotdb/pull/8019#discussion_r1025896811


##########
server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java:
##########
@@ -111,36 +145,87 @@ public void close() throws PipeException {
     }
   }
 
-  private void takePipeDataAndTransport(ISyncClient syncClient, String dataRegionId) {
+  /**
+   * heartbeat will be executed with delay time {@link SyncConstant#HEARTBEAT_INTERVAL_MILLISECONDS}
+   * only if there are transport threads being blocked. It will notify all blocked transport thread
+   * if successfully reconnect to receiver.
+   *
+   * <p>It will print warn log per {@link SyncConstant#LOST_CONNECT_REPORT_MILLISECONDS} ms.
+   */
+  private void heartbeat() {
     try {
-      while (!Thread.currentThread().isInterrupted()) {
-        try {
-          if (!syncClient.handshake()) {
-            SyncService.getInstance()
-                .recordMessage(
-                    pipe.getName(),
-                    new PipeMessage(
-                        PipeMessage.PipeMessageType.ERROR,
-                        String.format("Can not handshake with %s", pipeSink)));
+      Object object = blockingQueue.take();
+      ISyncClient client = SyncClientFactory.createHeartbeatClient(pipe, pipeSink);
+      try {
+        client.handshake();
+        lostConnectionTime = Long.MAX_VALUE;
+        logger.info("Reconnect to {} successfully.", pipeSink);
+        synchronized (object) {
+          object.notify();
+        }
+        while (!blockingQueue.isEmpty()) {
+          object = blockingQueue.take();
+          synchronized (object) {
+            object.notify();
           }
-          while (!Thread.currentThread().isInterrupted()) {
-            PipeData pipeData = pipe.take(dataRegionId);
-            if (!syncClient.send(pipeData)) {
-              logger.error(String.format("Can not transfer pipedata %s, skip it.", pipeData));
-              // can do something.
+        }
+      } catch (SyncConnectionException e) {
+        blockingQueue.offer(object);
+        long reportInterval = System.currentTimeMillis() - lastReportTime;
+        if (reportInterval > SyncConstant.LOST_CONNECT_REPORT_MILLISECONDS) {
+          logger.warn(
+              "Connection error because {}, lost contact with the receiver {} for {} milliseconds.",
+              e.getMessage(),
+              pipeSink,
+              System.currentTimeMillis() - lostConnectionTime);
+          lastReportTime = System.currentTimeMillis();
+        }
+      } finally {
+        client.close();
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void takePipeDataAndTransport(ISyncClient syncClient, String dataRegionId) {
+    try {
+      Object lock = new Object();
+      synchronized (lock) {
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            if (!syncClient.handshake()) {
               SyncService.getInstance()
                   .recordMessage(
                       pipe.getName(),
                       new PipeMessage(
-                          PipeMessage.PipeMessageType.WARN,
-                          String.format(
-                              "Transfer piepdata %s error, skip it.", pipeData.getSerialNumber())));
+                          PipeMessage.PipeMessageType.ERROR,
+                          String.format("Can not handshake with %s", pipeSink)));
+            }
+            while (!Thread.currentThread().isInterrupted()) {
+              PipeData pipeData = pipe.take(dataRegionId);
+              if (!syncClient.send(pipeData)) {
+                logger.error(String.format("Can not transfer PipeData %s, skip it.", pipeData));
+                // can do something.
+                SyncService.getInstance()
+                    .recordMessage(
+                        pipe.getName(),
+                        new PipeMessage(
+                            PipeMessage.PipeMessageType.WARN,
+                            String.format(
+                                "Transfer PipeData %s error, skip it.",
+                                pipeData.getSerialNumber())));
+              }
+              pipe.commit(dataRegionId);
             }
-            pipe.commit(dataRegionId);
+          } catch (SyncConnectionException e) {
+            // If failed to connect to receiver, it will hang up until scheduled heartbeat task
+            // successfully reconnect to receiver.
+            logger.error(String.format("Connect to receiver %s error, because %s.", pipeSink, e));

Review Comment:
   ```suggestion
               logger.error("Connect to receiver {} error, because ", pipeSink, e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org