You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/15 06:46:52 UTC

[iotdb] 01/01: fix error for storage group not ready

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/fix_storage_group_not_ready
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 154b54df94f565201f3fbe033c9cca62f2294f58
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Tue Nov 15 14:46:34 2022 +0800

    fix error for storage group not ready
---
 .../iotdb/db/doublelive/OperationSyncConsumer.java | 24 ++++++++++++++++++++--
 .../db/doublelive/OperationSyncDDLProtector.java   | 24 +++++++++++++++++++++-
 .../db/doublelive/OperationSyncDMLProtector.java   | 24 +++++++++++++++++++++-
 3 files changed, 68 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
index cdb23d24eb..07040a488b 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
@@ -19,7 +19,9 @@
 package org.apache.iotdb.db.doublelive;
 
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -30,6 +32,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
 public class OperationSyncConsumer implements Runnable {
   private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
 
@@ -68,10 +72,26 @@ public class OperationSyncConsumer implements Runnable {
         } catch (IoTDBConnectionException connectionException) {
           // warn IoTDBConnectionException and do serialization
           LOGGER.warn(
-              "OperationSyncConsumer can't transmit because network failure", connectionException);
+              "OperationSyncConsumer can't transmit for connection error", connectionException);
+        } catch (BatchExecutionException batchExecutionException) {
+          LOGGER.error(
+              "OperationSyncConsumer can't transmit for batchExecutionException",
+              batchExecutionException);
+          if (batchExecutionException.getStatusList().stream()
+              .noneMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+            continue;
+          }
+        } catch (StatementExecutionException statementExecutionException) {
+          LOGGER.error(
+              "OperationSyncConsumer can't transmit for statementExecutionException",
+              statementExecutionException);
+          if (statementExecutionException.getStatusCode()
+              != STORAGE_GROUP_NOT_READY.getStatusCode()) {
+            continue;
+          }
         } catch (Exception e) {
           // The PhysicalPlan has internal error, reject transmit
-          LOGGER.error("OperationSyncConsumer can't transmit", e);
+          LOGGER.error("OperationSyncConsumer can't transmit, discard it", e);
           continue;
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
index 1870145980..dea54020cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.doublelive;
 
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
 
 import org.slf4j.Logger;
@@ -29,6 +31,8 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
 public class OperationSyncDDLProtector extends OperationSyncProtector {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);;
@@ -57,7 +61,25 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
           transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
         } catch (IoTDBConnectionException connectionException) {
           // warn IoTDBConnectionException and retry
-          LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
+          LOGGER.warn(
+              "OperationSyncDDLProtector can't transmit for connection error, retrying...",
+              connectionException);
+        } catch (BatchExecutionException batchExecutionException) {
+          LOGGER.error(
+              "OperationSyncDDLProtector can't transmit for batchExecutionException",
+              batchExecutionException);
+          if (batchExecutionException.getStatusList().stream()
+              .noneMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+            break;
+          }
+        } catch (StatementExecutionException statementExecutionException) {
+          LOGGER.error(
+              "OperationSyncDDLProtector can't transmit for statementExecutionException",
+              statementExecutionException);
+          if (statementExecutionException.getStatusCode()
+              != STORAGE_GROUP_NOT_READY.getStatusCode()) {
+            break;
+          }
         } catch (Exception e) {
           // error exception and break
           LOGGER.error("OperationSyncDDLProtector can't transmit", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
index 2c2c89ef94..2b5585c9dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
@@ -20,12 +20,16 @@ package org.apache.iotdb.db.doublelive;
 
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
 public class OperationSyncDMLProtector extends OperationSyncProtector {
 
   private final OperationSyncDDLProtector ddlProtector;
@@ -61,7 +65,25 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
           transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
         } catch (IoTDBConnectionException connectionException) {
           // warn IoTDBConnectionException and retry
-          LOGGER.warn("OperationSyncDMLProtector can't transmit, retrying...", connectionException);
+          LOGGER.warn(
+              "OperationSyncDMLProtector can't transmit for connection error, retrying...",
+              connectionException);
+        } catch (BatchExecutionException batchExecutionException) {
+          LOGGER.error(
+              "OperationSyncDMLProtector can't transmit for batchExecutionException",
+              batchExecutionException);
+          if (batchExecutionException.getStatusList().stream()
+              .noneMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+            break;
+          }
+        } catch (StatementExecutionException statementExecutionException) {
+          LOGGER.error(
+              "OperationSyncDMLProtector can't transmit for statementExecutionException",
+              statementExecutionException);
+          if (statementExecutionException.getStatusCode()
+              != STORAGE_GROUP_NOT_READY.getStatusCode()) {
+            break;
+          }
         } catch (Exception e) {
           // error exception and break
           LOGGER.error("OperationSyncDMLProtector can't transmit", e);