You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/06/28 03:48:28 UTC

[iotdb] 01/01: cherry pick master cluster bugfixs to 0.12

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch cp_master_cluster_bugfixs_to_0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 103a018e33cda27a913968fe69df0c573d202b2e
Author: LebronAl <TX...@gmail.com>
AuthorDate: Mon Jun 28 11:47:44 2021 +0800

    cherry pick master cluster bugfixs to 0.12
---
 .../cluster/log/manage/CommittedEntryManager.java  |  4 ++
 .../query/reader/mult/RemoteMultSeriesReader.java  |  4 +-
 .../cluster/server/member/DataGroupMember.java     | 43 ++++++++++++++++++----
 .../cluster/server/member/MetaGroupMember.java     |  2 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 23 +++++++++---
 5 files changed, 59 insertions(+), 17 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index e86df85..d8d511b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -242,6 +242,10 @@ public class CommittedEntryManager {
       }
       entries.addAll(appendingEntries);
     } else if (entries.size() - offset > 0) {
+      logger.error(
+          "committed entries cannot be truncated: current entries:{}, appendingEntries {}",
+          entries,
+          appendingEntries);
       throw new TruncateCommittedEntryException(
           appendingEntries.get(0).getCurrLogIndex(), getLastIndex());
     } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index 9c09bcb..bf20b35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -80,7 +80,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
   }
 
   @Override
-  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+  public synchronized boolean hasNextTimeValuePair(String fullPath) throws IOException {
     BatchData batchData = currentBatchDatas.get(fullPath);
     if (batchData != null && batchData.hasCurrent()) {
       return true;
@@ -89,7 +89,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
     return checkPathBatchData(fullPath);
   }
 
-  private boolean checkPathBatchData(String fullPath) {
+  private synchronized boolean checkPathBatchData(String fullPath) {
     BatchData batchData = cachedBatchs.get(fullPath).peek();
     if (batchData != null && !batchData.isEmpty()) {
       return true;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index ba943a9..2b2b646 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.log.LogApplier;
@@ -65,6 +66,7 @@ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -75,6 +77,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -692,16 +695,40 @@ public class DataGroupMember extends RaftMember {
    */
   @Override
   public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
-    TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
-    if (!StatusUtils.NO_LEADER.equals(status)) {
-      return status;
-    }
+    if (ClusterDescriptor.getInstance().getConfig().getReplicationNum() == 1) {
+      try {
+        getLocalExecutor().processNonQuery(plan);
+        return StatusUtils.OK;
+      } catch (Exception e) {
+        Throwable cause = IOUtils.getRootCause(e);
+        if (cause instanceof StorageGroupNotSetException) {
+          try {
+            metaGroupMember.syncLeaderWithConsistencyCheck(true);
+            if (plan instanceof InsertPlan && ((InsertPlan) plan).getFailedMeasurements() != null) {
+              ((InsertPlan) plan).recoverFromFailure();
+            }
+            getLocalExecutor().processNonQuery(plan);
+            return StatusUtils.OK;
+          } catch (CheckConsistencyException ce) {
+            return StatusUtils.getStatus(StatusUtils.CONSISTENCY_FAILURE, ce.getMessage());
+          } catch (Exception ne) {
+            return handleLogExecutionException(plan, IOUtils.getRootCause(ne));
+          }
+        }
+        return handleLogExecutionException(plan, cause);
+      }
+    } else {
+      TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
+      if (!StatusUtils.NO_LEADER.equals(status)) {
+        return status;
+      }
 
-    long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
-    waitLeader();
-    Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
+      long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
+      waitLeader();
+      Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
 
-    return executeNonQueryPlanWithKnownLeader(plan);
+      return executeNonQueryPlanWithKnownLeader(plan);
+    }
   }
 
   private TSStatus executeNonQueryPlanWithKnownLeader(PhysicalPlan plan) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index c477d5f..58884e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -466,7 +466,7 @@ public class MetaGroupMember extends RaftMember {
     try {
       if (logger.isInfoEnabled()) {
         NodeReport report = genNodeReport();
-        logger.info(report.toString());
+        logger.debug(report.toString());
       }
     } catch (Exception e) {
       logger.error("{} exception occurred when generating node report", name, e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 426c370..9890f6b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -66,7 +66,8 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -224,6 +225,11 @@ public abstract class RaftMember {
    */
   private LogDispatcher logDispatcher;
 
+  /**
+   * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
+   */
+  protected PlanExecutor localExecutor;
+
   protected RaftMember() {}
 
   protected RaftMember(
@@ -563,6 +569,13 @@ public abstract class RaftMember {
     return response;
   }
 
+  public PlanExecutor getLocalExecutor() throws QueryProcessException {
+    if (localExecutor == null) {
+      localExecutor = new PlanExecutor();
+    }
+    return localExecutor;
+  }
+
   /**
    * Get an asynchronous heartbeat thrift client to the given node.
    *
@@ -972,7 +985,7 @@ public abstract class RaftMember {
         return StatusUtils.OK;
       }
     } catch (LogExecutionException e) {
-      return handleLogExecutionException(log, e);
+      return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e));
     }
     return StatusUtils.TIME_OUT;
   }
@@ -1036,7 +1049,7 @@ public abstract class RaftMember {
           break;
       }
     } catch (LogExecutionException e) {
-      return handleLogExecutionException(log, e);
+      return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e));
     }
     return StatusUtils.TIME_OUT;
   }
@@ -1468,8 +1481,7 @@ public abstract class RaftMember {
     }
   }
 
-  private TSStatus handleLogExecutionException(PhysicalPlanLog log, LogExecutionException e) {
-    Throwable cause = IOUtils.getRootCause(e);
+  protected TSStatus handleLogExecutionException(PhysicalPlan log, Throwable cause) {
     if (cause instanceof BatchProcessException) {
       return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) cause).getFailingStatus()));
     }
@@ -1482,7 +1494,6 @@ public abstract class RaftMember {
       tsStatus.setCode(((IoTDBException) cause).getErrorCode());
     }
     if (!(cause instanceof PathNotExistException)
-        && !(cause instanceof StorageGroupNotSetException)
         && !(cause instanceof PathAlreadyExistException)
         && !(cause instanceof StorageGroupAlreadySetException)) {
       logger.debug("{} cannot be executed because ", log, cause);