You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/06/29 03:30:19 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12][Cluster][Cherry-Pick] write perfromance optimization when replicaNum == 1 && fix a query concurrent bug when query multi timeseries && remove no-merge info log (#3460)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 651a12a  [To rel/0.12][Cluster][Cherry-Pick] write perfromance optimization when replicaNum == 1 && fix a query concurrent bug when query multi timeseries && remove no-merge info log (#3460)
651a12a is described below

commit 651a12a0de24427619f2178f21544ac26feeebab
Author: Potato <TX...@gmail.com>
AuthorDate: Tue Jun 29 11:30:00 2021 +0800

    [To rel/0.12][Cluster][Cherry-Pick] write perfromance optimization when replicaNum == 1 && fix a query concurrent bug when query multi timeseries && remove no-merge info log (#3460)
---
 .../cluster/log/manage/CommittedEntryManager.java  |  4 ++
 .../query/reader/mult/RemoteMultSeriesReader.java  |  4 +-
 .../cluster/server/member/DataGroupMember.java     | 43 ++++++++++++++++++----
 .../cluster/server/member/MetaGroupMember.java     |  2 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 23 +++++++++---
 .../no/NoCompactionTsFileManagement.java           |  4 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  2 +-
 7 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index e86df85..d8d511b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -242,6 +242,10 @@ public class CommittedEntryManager {
       }
       entries.addAll(appendingEntries);
     } else if (entries.size() - offset > 0) {
+      logger.error(
+          "committed entries cannot be truncated: current entries:{}, appendingEntries {}",
+          entries,
+          appendingEntries);
       throw new TruncateCommittedEntryException(
           appendingEntries.get(0).getCurrLogIndex(), getLastIndex());
     } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index 9c09bcb..bf20b35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -80,7 +80,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
   }
 
   @Override
-  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+  public synchronized boolean hasNextTimeValuePair(String fullPath) throws IOException {
     BatchData batchData = currentBatchDatas.get(fullPath);
     if (batchData != null && batchData.hasCurrent()) {
       return true;
@@ -89,7 +89,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
     return checkPathBatchData(fullPath);
   }
 
-  private boolean checkPathBatchData(String fullPath) {
+  private synchronized boolean checkPathBatchData(String fullPath) {
     BatchData batchData = cachedBatchs.get(fullPath).peek();
     if (batchData != null && !batchData.isEmpty()) {
       return true;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index ba943a9..2b2b646 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.log.LogApplier;
@@ -65,6 +66,7 @@ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -75,6 +77,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -692,16 +695,40 @@ public class DataGroupMember extends RaftMember {
    */
   @Override
   public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
-    TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
-    if (!StatusUtils.NO_LEADER.equals(status)) {
-      return status;
-    }
+    if (ClusterDescriptor.getInstance().getConfig().getReplicationNum() == 1) {
+      try {
+        getLocalExecutor().processNonQuery(plan);
+        return StatusUtils.OK;
+      } catch (Exception e) {
+        Throwable cause = IOUtils.getRootCause(e);
+        if (cause instanceof StorageGroupNotSetException) {
+          try {
+            metaGroupMember.syncLeaderWithConsistencyCheck(true);
+            if (plan instanceof InsertPlan && ((InsertPlan) plan).getFailedMeasurements() != null) {
+              ((InsertPlan) plan).recoverFromFailure();
+            }
+            getLocalExecutor().processNonQuery(plan);
+            return StatusUtils.OK;
+          } catch (CheckConsistencyException ce) {
+            return StatusUtils.getStatus(StatusUtils.CONSISTENCY_FAILURE, ce.getMessage());
+          } catch (Exception ne) {
+            return handleLogExecutionException(plan, IOUtils.getRootCause(ne));
+          }
+        }
+        return handleLogExecutionException(plan, cause);
+      }
+    } else {
+      TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
+      if (!StatusUtils.NO_LEADER.equals(status)) {
+        return status;
+      }
 
-    long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
-    waitLeader();
-    Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
+      long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
+      waitLeader();
+      Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
 
-    return executeNonQueryPlanWithKnownLeader(plan);
+      return executeNonQueryPlanWithKnownLeader(plan);
+    }
   }
 
   private TSStatus executeNonQueryPlanWithKnownLeader(PhysicalPlan plan) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index c477d5f..58884e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -466,7 +466,7 @@ public class MetaGroupMember extends RaftMember {
     try {
       if (logger.isInfoEnabled()) {
         NodeReport report = genNodeReport();
-        logger.info(report.toString());
+        logger.debug(report.toString());
       }
     } catch (Exception e) {
       logger.error("{} exception occurred when generating node report", name, e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 426c370..9890f6b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -66,7 +66,8 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -224,6 +225,11 @@ public abstract class RaftMember {
    */
   private LogDispatcher logDispatcher;
 
+  /**
+   * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
+   */
+  protected PlanExecutor localExecutor;
+
   protected RaftMember() {}
 
   protected RaftMember(
@@ -563,6 +569,13 @@ public abstract class RaftMember {
     return response;
   }
 
+  public PlanExecutor getLocalExecutor() throws QueryProcessException {
+    if (localExecutor == null) {
+      localExecutor = new PlanExecutor();
+    }
+    return localExecutor;
+  }
+
   /**
    * Get an asynchronous heartbeat thrift client to the given node.
    *
@@ -972,7 +985,7 @@ public abstract class RaftMember {
         return StatusUtils.OK;
       }
     } catch (LogExecutionException e) {
-      return handleLogExecutionException(log, e);
+      return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e));
     }
     return StatusUtils.TIME_OUT;
   }
@@ -1036,7 +1049,7 @@ public abstract class RaftMember {
           break;
       }
     } catch (LogExecutionException e) {
-      return handleLogExecutionException(log, e);
+      return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e));
     }
     return StatusUtils.TIME_OUT;
   }
@@ -1468,8 +1481,7 @@ public abstract class RaftMember {
     }
   }
 
-  private TSStatus handleLogExecutionException(PhysicalPlanLog log, LogExecutionException e) {
-    Throwable cause = IOUtils.getRootCause(e);
+  protected TSStatus handleLogExecutionException(PhysicalPlan log, Throwable cause) {
     if (cause instanceof BatchProcessException) {
       return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) cause).getFailingStatus()));
     }
@@ -1482,7 +1494,6 @@ public abstract class RaftMember {
       tsStatus.setCode(((IoTDBException) cause).getErrorCode());
     }
     if (!(cause instanceof PathNotExistException)
-        && !(cause instanceof StorageGroupNotSetException)
         && !(cause instanceof PathAlreadyExistException)
         && !(cause instanceof StorageGroupAlreadySetException)) {
       logger.debug("{} cannot be executed because ", log, cause);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 5c3d8b1..cd8b138 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -272,12 +272,12 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
 
   @Override
   public void forkCurrentFileList(long timePartition) {
-    logger.info("{} do not need fork", storageGroupName);
+    logger.debug("{} do not need fork", storageGroupName);
   }
 
   @Override
   protected void merge(long timePartition) {
-    logger.info("{} no merge logic", storageGroupName);
+    logger.debug("{} no merge logic", storageGroupName);
   }
 
   private TreeSet<TsFileResource> newSequenceTsFileResources(Long k) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 53b731e..388a000 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1990,7 +1990,7 @@ public class StorageGroupProcessor {
   }
 
   private void syncCompactOnePartition(long timePartition, boolean fullMerge) {
-    logger.info(
+    logger.debug(
         "{}-{} partition:{}, submit a compaction merge task",
         logicalStorageGroupName,
         virtualStorageGroupId,