You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/15 06:46:52 UTC
[iotdb] 01/01: fix error for storage group not ready
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/fix_storage_group_not_ready
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 154b54df94f565201f3fbe033c9cca62f2294f58
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Tue Nov 15 14:46:34 2022 +0800
fix error for storage group not ready
---
.../iotdb/db/doublelive/OperationSyncConsumer.java | 24 ++++++++++++++++++++--
.../db/doublelive/OperationSyncDDLProtector.java | 24 +++++++++++++++++++++-
.../db/doublelive/OperationSyncDMLProtector.java | 24 +++++++++++++++++++++-
3 files changed, 68 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
index cdb23d24eb..07040a488b 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -30,6 +32,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
public class OperationSyncConsumer implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
@@ -68,10 +72,26 @@ public class OperationSyncConsumer implements Runnable {
} catch (IoTDBConnectionException connectionException) {
// warn IoTDBConnectionException and do serialization
LOGGER.warn(
- "OperationSyncConsumer can't transmit because network failure", connectionException);
+ "OperationSyncConsumer can't transmit for connection error", connectionException);
+ } catch (BatchExecutionException batchExecutionException) {
+ LOGGER.error(
+ "OperationSyncConsumer can't transmit for batchExecutionException",
+ batchExecutionException);
+ if (batchExecutionException.getStatusList().stream()
+ .noneMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+ continue;
+ }
+ } catch (StatementExecutionException statementExecutionException) {
+ LOGGER.error(
+ "OperationSyncConsumer can't transmit for statementExecutionException",
+ statementExecutionException);
+ if (statementExecutionException.getStatusCode()
+ != STORAGE_GROUP_NOT_READY.getStatusCode()) {
+ continue;
+ }
} catch (Exception e) {
// The PhysicalPlan has internal error, reject transmit
- LOGGER.error("OperationSyncConsumer can't transmit", e);
+ LOGGER.error("OperationSyncConsumer can't transmit, discard it", e);
continue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
index 1870145980..dea54020cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.slf4j.Logger;
@@ -29,6 +31,8 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
public class OperationSyncDDLProtector extends OperationSyncProtector {
private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);;
@@ -57,7 +61,25 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
} catch (IoTDBConnectionException connectionException) {
// warn IoTDBConnectionException and retry
- LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
+ LOGGER.warn(
+ "OperationSyncDDLProtector can't transmit for connection error, retrying...",
+ connectionException);
+ } catch (BatchExecutionException batchExecutionException) {
+ LOGGER.error(
+ "OperationSyncDDLProtector can't transmit for batchExecutionException",
+ batchExecutionException);
+ if (batchExecutionException.getStatusList().stream()
+ .noneMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+ break;
+ }
+ } catch (StatementExecutionException statementExecutionException) {
+ LOGGER.error(
+ "OperationSyncDDLProtector can't transmit for statementExecutionException",
+ statementExecutionException);
+ if (statementExecutionException.getStatusCode()
+ != STORAGE_GROUP_NOT_READY.getStatusCode()) {
+ break;
+ }
} catch (Exception e) {
// error exception and break
LOGGER.error("OperationSyncDDLProtector can't transmit", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
index 2c2c89ef94..2b5585c9dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
@@ -20,12 +20,16 @@ package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
public class OperationSyncDMLProtector extends OperationSyncProtector {
private final OperationSyncDDLProtector ddlProtector;
@@ -61,7 +65,25 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
} catch (IoTDBConnectionException connectionException) {
// warn IoTDBConnectionException and retry
- LOGGER.warn("OperationSyncDMLProtector can't transmit, retrying...", connectionException);
+ LOGGER.warn(
+ "OperationSyncDMLProtector can't transmit for connection error, retrying...",
+ connectionException);
+ } catch (BatchExecutionException batchExecutionException) {
+ LOGGER.error(
+ "OperationSyncDMLProtector can't transmit for batchExecutionException",
+ batchExecutionException);
+ if (batchExecutionException.getStatusList().stream()
+ .noneMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+ break;
+ }
+ } catch (StatementExecutionException statementExecutionException) {
+ LOGGER.error(
+ "OperationSyncDMLProtector can't transmit for statementExecutionException",
+ statementExecutionException);
+ if (statementExecutionException.getStatusCode()
+ != STORAGE_GROUP_NOT_READY.getStatusCode()) {
+ break;
+ }
} catch (Exception e) {
// error exception and break
LOGGER.error("OperationSyncDMLProtector can't transmit", e);