You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/06/28 03:48:27 UTC

[iotdb] branch cp_master_cluster_bugfixs_to_0.12 created (now 103a018)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch cp_master_cluster_bugfixs_to_0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 103a018  cherry pick master cluster bugfixs to 0.12

This branch includes the following new commits:

     new 103a018  cherry pick master cluster bugfixs to 0.12

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: cherry pick master cluster bugfixs to 0.12

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch cp_master_cluster_bugfixs_to_0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 103a018e33cda27a913968fe69df0c573d202b2e
Author: LebronAl <TX...@gmail.com>
AuthorDate: Mon Jun 28 11:47:44 2021 +0800

    cherry pick master cluster bugfixs to 0.12
---
 .../cluster/log/manage/CommittedEntryManager.java  |  4 ++
 .../query/reader/mult/RemoteMultSeriesReader.java  |  4 +-
 .../cluster/server/member/DataGroupMember.java     | 43 ++++++++++++++++++----
 .../cluster/server/member/MetaGroupMember.java     |  2 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 23 +++++++++---
 5 files changed, 59 insertions(+), 17 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index e86df85..d8d511b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -242,6 +242,10 @@ public class CommittedEntryManager {
       }
       entries.addAll(appendingEntries);
     } else if (entries.size() - offset > 0) {
+      logger.error(
+          "committed entries cannot be truncated: current entries:{}, appendingEntries {}",
+          entries,
+          appendingEntries);
       throw new TruncateCommittedEntryException(
           appendingEntries.get(0).getCurrLogIndex(), getLastIndex());
     } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index 9c09bcb..bf20b35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -80,7 +80,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
   }
 
   @Override
-  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+  public synchronized boolean hasNextTimeValuePair(String fullPath) throws IOException {
     BatchData batchData = currentBatchDatas.get(fullPath);
     if (batchData != null && batchData.hasCurrent()) {
       return true;
@@ -89,7 +89,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
     return checkPathBatchData(fullPath);
   }
 
-  private boolean checkPathBatchData(String fullPath) {
+  private synchronized boolean checkPathBatchData(String fullPath) {
     BatchData batchData = cachedBatchs.get(fullPath).peek();
     if (batchData != null && !batchData.isEmpty()) {
       return true;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index ba943a9..2b2b646 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.log.LogApplier;
@@ -65,6 +66,7 @@ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -75,6 +77,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -692,16 +695,40 @@ public class DataGroupMember extends RaftMember {
    */
   @Override
   public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
-    TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
-    if (!StatusUtils.NO_LEADER.equals(status)) {
-      return status;
-    }
+    if (ClusterDescriptor.getInstance().getConfig().getReplicationNum() == 1) {
+      try {
+        getLocalExecutor().processNonQuery(plan);
+        return StatusUtils.OK;
+      } catch (Exception e) {
+        Throwable cause = IOUtils.getRootCause(e);
+        if (cause instanceof StorageGroupNotSetException) {
+          try {
+            metaGroupMember.syncLeaderWithConsistencyCheck(true);
+            if (plan instanceof InsertPlan && ((InsertPlan) plan).getFailedMeasurements() != null) {
+              ((InsertPlan) plan).recoverFromFailure();
+            }
+            getLocalExecutor().processNonQuery(plan);
+            return StatusUtils.OK;
+          } catch (CheckConsistencyException ce) {
+            return StatusUtils.getStatus(StatusUtils.CONSISTENCY_FAILURE, ce.getMessage());
+          } catch (Exception ne) {
+            return handleLogExecutionException(plan, IOUtils.getRootCause(ne));
+          }
+        }
+        return handleLogExecutionException(plan, cause);
+      }
+    } else {
+      TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
+      if (!StatusUtils.NO_LEADER.equals(status)) {
+        return status;
+      }
 
-    long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
-    waitLeader();
-    Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
+      long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
+      waitLeader();
+      Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
 
-    return executeNonQueryPlanWithKnownLeader(plan);
+      return executeNonQueryPlanWithKnownLeader(plan);
+    }
   }
 
   private TSStatus executeNonQueryPlanWithKnownLeader(PhysicalPlan plan) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index c477d5f..58884e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -466,7 +466,7 @@ public class MetaGroupMember extends RaftMember {
     try {
       if (logger.isInfoEnabled()) {
         NodeReport report = genNodeReport();
-        logger.info(report.toString());
+        logger.debug(report.toString());
       }
     } catch (Exception e) {
       logger.error("{} exception occurred when generating node report", name, e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 426c370..9890f6b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -66,7 +66,8 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -224,6 +225,11 @@ public abstract class RaftMember {
    */
   private LogDispatcher logDispatcher;
 
+  /**
+   * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
+   */
+  protected PlanExecutor localExecutor;
+
   protected RaftMember() {}
 
   protected RaftMember(
@@ -563,6 +569,13 @@ public abstract class RaftMember {
     return response;
   }
 
+  public PlanExecutor getLocalExecutor() throws QueryProcessException {
+    if (localExecutor == null) {
+      localExecutor = new PlanExecutor();
+    }
+    return localExecutor;
+  }
+
   /**
    * Get an asynchronous heartbeat thrift client to the given node.
    *
@@ -972,7 +985,7 @@ public abstract class RaftMember {
         return StatusUtils.OK;
       }
     } catch (LogExecutionException e) {
-      return handleLogExecutionException(log, e);
+      return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e));
     }
     return StatusUtils.TIME_OUT;
   }
@@ -1036,7 +1049,7 @@ public abstract class RaftMember {
           break;
       }
     } catch (LogExecutionException e) {
-      return handleLogExecutionException(log, e);
+      return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e));
     }
     return StatusUtils.TIME_OUT;
   }
@@ -1468,8 +1481,7 @@ public abstract class RaftMember {
     }
   }
 
-  private TSStatus handleLogExecutionException(PhysicalPlanLog log, LogExecutionException e) {
-    Throwable cause = IOUtils.getRootCause(e);
+  protected TSStatus handleLogExecutionException(PhysicalPlan log, Throwable cause) {
     if (cause instanceof BatchProcessException) {
       return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) cause).getFailingStatus()));
     }
@@ -1482,7 +1494,6 @@ public abstract class RaftMember {
       tsStatus.setCode(((IoTDBException) cause).getErrorCode());
     }
     if (!(cause instanceof PathNotExistException)
-        && !(cause instanceof StorageGroupNotSetException)
         && !(cause instanceof PathAlreadyExistException)
         && !(cause instanceof StorageGroupAlreadySetException)) {
       logger.debug("{} cannot be executed because ", log, cause);