You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/16 00:58:13 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13] [IOTDB-4938] Fix error for storage group not ready (#7992)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new a45ab48555 [To rel/0.13] [IOTDB-4938] Fix error for storage group not ready (#7992)
a45ab48555 is described below
commit a45ab48555396c794059a5dcf7f9ca1586550396
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Nov 16 08:58:05 2022 +0800
[To rel/0.13] [IOTDB-4938] Fix error for storage group not ready (#7992)
---
.../iotdb/db/doublelive/OperationSyncConsumer.java | 32 ++++++++++++++++--
.../db/doublelive/OperationSyncDDLProtector.java | 39 ++++++++++++++++++++--
.../db/doublelive/OperationSyncDMLProtector.java | 39 ++++++++++++++++++++--
3 files changed, 102 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
index cdb23d24eb..304d35c200 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -30,6 +32,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
public class OperationSyncConsumer implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
@@ -68,10 +72,34 @@ public class OperationSyncConsumer implements Runnable {
} catch (IoTDBConnectionException connectionException) {
// warn IoTDBConnectionException and do serialization
LOGGER.warn(
- "OperationSyncConsumer can't transmit because network failure", connectionException);
+ "OperationSyncConsumer can't transmit for connection error", connectionException);
+ } catch (BatchExecutionException batchExecutionException) {
+ if (batchExecutionException.getStatusList().stream()
+ .anyMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+ LOGGER.warn(
+ "OperationSyncConsumer can't transmit for STORAGE_GROUP_NOT_READY",
+ batchExecutionException);
+ } else {
+ LOGGER.warn(
+ "OperationSyncConsumer can't transmit for batchExecutionException, discard it",
+ batchExecutionException);
+ continue;
+ }
+ } catch (StatementExecutionException statementExecutionException) {
+ if (statementExecutionException.getStatusCode()
+ == STORAGE_GROUP_NOT_READY.getStatusCode()) {
+ LOGGER.warn(
+ "OperationSyncConsumer can't transmit for STORAGE_GROUP_NOT_READY",
+ statementExecutionException);
+ } else {
+ LOGGER.warn(
+ "OperationSyncConsumer can't transmit for statementExecutionException, discard it",
+ statementExecutionException);
+ continue;
+ }
} catch (Exception e) {
// The PhysicalPlan has internal error, reject transmit
- LOGGER.error("OperationSyncConsumer can't transmit", e);
+ LOGGER.error("OperationSyncConsumer can't transmit, discard it", e);
continue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
index 1870145980..a3abba895d 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.slf4j.Logger;
@@ -29,6 +31,8 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
public class OperationSyncDDLProtector extends OperationSyncProtector {
private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);;
@@ -47,6 +51,7 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
@Override
protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
+ long sleepTimeInSeconds = 1;
while (true) {
// transmit E-Plan until it's been received
boolean transmitStatus = false;
@@ -57,10 +62,38 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
} catch (IoTDBConnectionException connectionException) {
// warn IoTDBConnectionException and retry
- LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
+ LOGGER.warn(
+ "OperationSyncDDLProtector can't transmit for connection error, retrying...",
+ connectionException);
+ } catch (BatchExecutionException batchExecutionException) {
+ if (batchExecutionException.getStatusList().stream()
+ .anyMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+ LOGGER.warn(
+ "OperationSyncDDLProtector can't transmit for STORAGE_GROUP_NOT_READY",
+ batchExecutionException);
+ sleepTimeInSeconds = 10;
+ } else {
+ LOGGER.warn(
+ "OperationSyncDDLProtector can't transmit for batchExecutionException, discard it",
+ batchExecutionException);
+ break;
+ }
+ } catch (StatementExecutionException statementExecutionException) {
+ if (statementExecutionException.getStatusCode()
+ == STORAGE_GROUP_NOT_READY.getStatusCode()) {
+ sleepTimeInSeconds = 10;
+ LOGGER.warn(
+ "OperationSyncDDLProtector can't transmit for STORAGE_GROUP_NOT_READY",
+ statementExecutionException);
+ } else {
+ LOGGER.warn(
+ "OperationSyncDDLProtector can't transmit for statementExecutionException, discard it",
+ statementExecutionException);
+ break;
+ }
} catch (Exception e) {
// error exception and break
- LOGGER.error("OperationSyncDDLProtector can't transmit", e);
+ LOGGER.error("OperationSyncDDLProtector can't transmit, discard it", e);
break;
}
} else {
@@ -74,7 +107,7 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
break;
} else {
try {
- TimeUnit.SECONDS.sleep(1);
+ TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
} catch (InterruptedException e) {
LOGGER.warn("OperationSyncDDLProtector is interrupted", e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
index 2c2c89ef94..111e7fa2d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
@@ -20,12 +20,16 @@ package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.rpc.TSStatusCode.STORAGE_GROUP_NOT_READY;
+
public class OperationSyncDMLProtector extends OperationSyncProtector {
private final OperationSyncDDLProtector ddlProtector;
@@ -51,6 +55,7 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
@Override
protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
+ long sleepTimeInSeconds = 1;
while (true) {
// transmit E-Plan until it's been received
boolean transmitStatus = false;
@@ -61,10 +66,38 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
} catch (IoTDBConnectionException connectionException) {
// warn IoTDBConnectionException and retry
- LOGGER.warn("OperationSyncDMLProtector can't transmit, retrying...", connectionException);
+ LOGGER.warn(
+ "OperationSyncDMLProtector can't transmit for connection error, retrying...",
+ connectionException);
+ } catch (BatchExecutionException batchExecutionException) {
+ if (batchExecutionException.getStatusList().stream()
+ .anyMatch(s -> s.getCode() == STORAGE_GROUP_NOT_READY.getStatusCode())) {
+ sleepTimeInSeconds = 10;
+ LOGGER.warn(
+ "OperationSyncDMLProtector can't transmit for STORAGE_GROUP_NOT_READY",
+ batchExecutionException);
+ } else {
+ LOGGER.warn(
+ "OperationSyncDMLProtector can't transmit for batchExecutionException, discard it",
+ batchExecutionException);
+ break;
+ }
+ } catch (StatementExecutionException statementExecutionException) {
+ if (statementExecutionException.getStatusCode()
+ == STORAGE_GROUP_NOT_READY.getStatusCode()) {
+ LOGGER.warn(
+ "OperationSyncDMLProtector can't transmit for STORAGE_GROUP_NOT_READY",
+ statementExecutionException);
+ sleepTimeInSeconds = 10;
+ } else {
+ LOGGER.warn(
+ "OperationSyncDMLProtector can't transmit for statementExecutionException, discard it",
+ statementExecutionException);
+ break;
+ }
} catch (Exception e) {
// error exception and break
- LOGGER.error("OperationSyncDMLProtector can't transmit", e);
+ LOGGER.error("OperationSyncDMLProtector can't transmit, discard it", e);
break;
}
} else {
@@ -78,7 +111,7 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
break;
} else {
try {
- TimeUnit.SECONDS.sleep(1);
+ TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
} catch (InterruptedException e) {
LOGGER.warn("OperationSyncDMLProtector is interrupted", e);
}