You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/16 00:58:13 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13] [IOTDB-4938] Fix error for storage group not ready (#7992)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new a45ab48555 [To rel/0.13] [IOTDB-4938] Fix error for storage group not ready (#7992)
a45ab48555 is described below

commit a45ab48555396c794059a5dcf7f9ca1586550396
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Nov 16 08:58:05 2022 +0800

    [To rel/0.13] [IOTDB-4938] Fix error for storage group not ready (#7992)
---
 .../iotdb/db/doublelive/OperationSyncConsumer.java | 32 ++++++++++++++++--
 .../db/doublelive/OperationSyncDDLProtector.java   | 39 ++++++++++++++++++++--
 .../db/doublelive/OperationSyncDMLProtector.java   | 39 ++++++++++++++++++++--
 3 files changed, 102 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
index cdb23d24eb..304d35c200 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
@@ -19,7 +19,9 @@
 package org.apache.iotdb.db.doublelive;
 
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -30,6 +32,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
 public class OperationSyncConsumer implements Runnable {
   private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
 
@@ -68,10 +72,34 @@ public class OperationSyncConsumer implements Runnable {
         } catch (IoTDBConnectionException connectionException) {
           // warn IoTDBConnectionException and do serialization
           LOGGER.warn(
-              "OperationSyncConsumer can't transmit because network failure", connectionException);
+              "OperationSyncConsumer can't transmit for connection error", connectionException);
+        } catch (BatchExecutionException batchExecutionException) {
+          if (batchExecutionException.getStatusList().stream()
+              .anyMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+            LOGGER.warn(
+                "OperationSyncConsumer can't transmit for STORAGE_GROUP_NOT_READY",
+                batchExecutionException);
+          } else {
+            LOGGER.warn(
+                "OperationSyncConsumer can't transmit for batchExecutionException, discard it",
+                batchExecutionException);
+            continue;
+          }
+        } catch (StatementExecutionException statementExecutionException) {
+          if (statementExecutionException.getStatusCode()
+              == STORAGE_GROUP_NOT_READY.getStatusCode()) {
+            LOGGER.warn(
+                "OperationSyncConsumer can't transmit for STORAGE_GROUP_NOT_READY",
+                statementExecutionException);
+          } else {
+            LOGGER.warn(
+                "OperationSyncConsumer can't transmit for statementExecutionException, discard it",
+                statementExecutionException);
+            continue;
+          }
         } catch (Exception e) {
           // The PhysicalPlan has internal error, reject transmit
-          LOGGER.error("OperationSyncConsumer can't transmit", e);
+          LOGGER.error("OperationSyncConsumer can't transmit, discard it", e);
           continue;
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
index 1870145980..a3abba895d 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.doublelive;
 
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
 
 import org.slf4j.Logger;
@@ -29,6 +31,8 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
 public class OperationSyncDDLProtector extends OperationSyncProtector {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);;
@@ -47,6 +51,7 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
 
   @Override
   protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
+    long sleepTimeInSeconds = 1;
     while (true) {
       // transmit E-Plan until it's been received
       boolean transmitStatus = false;
@@ -57,10 +62,38 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
           transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
         } catch (IoTDBConnectionException connectionException) {
           // warn IoTDBConnectionException and retry
-          LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
+          LOGGER.warn(
+              "OperationSyncDDLProtector can't transmit for connection error, retrying...",
+              connectionException);
+        } catch (BatchExecutionException batchExecutionException) {
+          if (batchExecutionException.getStatusList().stream()
+              .anyMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+            LOGGER.warn(
+                "OperationSyncDDLProtector can't transmit for STORAGE_GROUP_NOT_READY",
+                batchExecutionException);
+            sleepTimeInSeconds = 10;
+          } else {
+            LOGGER.warn(
+                "OperationSyncDDLProtector can't transmit for batchExecutionException, discard it",
+                batchExecutionException);
+            break;
+          }
+        } catch (StatementExecutionException statementExecutionException) {
+          if (statementExecutionException.getStatusCode()
+              == STORAGE_GROUP_NOT_READY.getStatusCode()) {
+            sleepTimeInSeconds = 10;
+            LOGGER.warn(
+                "OperationSyncDDLProtector can't transmit for STORAGE_GROUP_NOT_READY",
+                statementExecutionException);
+          } else {
+            LOGGER.warn(
+                "OperationSyncDDLProtector can't transmit for statementExecutionException, discard it",
+                statementExecutionException);
+            break;
+          }
         } catch (Exception e) {
           // error exception and break
-          LOGGER.error("OperationSyncDDLProtector can't transmit", e);
+          LOGGER.error("OperationSyncDDLProtector can't transmit, discard it", e);
           break;
         }
       } else {
@@ -74,7 +107,7 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
         break;
       } else {
         try {
-          TimeUnit.SECONDS.sleep(1);
+          TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
         } catch (InterruptedException e) {
           LOGGER.warn("OperationSyncDDLProtector is interrupted", e);
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
index 2c2c89ef94..111e7fa2d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
@@ -20,12 +20,16 @@ package org.apache.iotdb.db.doublelive;
 
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
 public class OperationSyncDMLProtector extends OperationSyncProtector {
 
   private final OperationSyncDDLProtector ddlProtector;
@@ -51,6 +55,7 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
 
   @Override
   protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
+    long sleepTimeInSeconds = 1;
     while (true) {
       // transmit E-Plan until it's been received
       boolean transmitStatus = false;
@@ -61,10 +66,38 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
           transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
         } catch (IoTDBConnectionException connectionException) {
           // warn IoTDBConnectionException and retry
-          LOGGER.warn("OperationSyncDMLProtector can't transmit, retrying...", connectionException);
+          LOGGER.warn(
+              "OperationSyncDMLProtector can't transmit for connection error, retrying...",
+              connectionException);
+        } catch (BatchExecutionException batchExecutionException) {
+          if (batchExecutionException.getStatusList().stream()
+              .anyMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+            sleepTimeInSeconds = 10;
+            LOGGER.warn(
+                "OperationSyncDMLProtector can't transmit for STORAGE_GROUP_NOT_READY",
+                batchExecutionException);
+          } else {
+            LOGGER.warn(
+                "OperationSyncDMLProtector can't transmit for batchExecutionException, discard it",
+                batchExecutionException);
+            break;
+          }
+        } catch (StatementExecutionException statementExecutionException) {
+          if (statementExecutionException.getStatusCode()
+              == STORAGE_GROUP_NOT_READY.getStatusCode()) {
+            LOGGER.warn(
+                "OperationSyncDMLProtector can't transmit for STORAGE_GROUP_NOT_READY",
+                statementExecutionException);
+            sleepTimeInSeconds = 10;
+          } else {
+            LOGGER.warn(
+                "OperationSyncDMLProtector can't transmit for statementExecutionException, discard it",
+                statementExecutionException);
+            break;
+          }
         } catch (Exception e) {
           // error exception and break
-          LOGGER.error("OperationSyncDMLProtector can't transmit", e);
+          LOGGER.error("OperationSyncDMLProtector can't transmit, discard it", e);
           break;
         }
       } else {
@@ -78,7 +111,7 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
         break;
       } else {
         try {
-          TimeUnit.SECONDS.sleep(1);
+          TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
         } catch (InterruptedException e) {
           LOGGER.warn("OperationSyncDMLProtector is interrupted", e);
         }