You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/05/24 04:37:29 UTC

[GitHub] [iotdb] jt2594838 commented on a change in pull request #3191: New features of cluster scalability and multi-raft

jt2594838 commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r632349537



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
##########
@@ -499,18 +515,26 @@ public static ByteBuffer last(
             context.getQueryId(),
             deviceMeasurements,
             header,
+            raftId,
             client.getNode());
 
     client.last(request, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static boolean onSnapshotApplied(AsyncDataClient client, Node header, List<Integer> slots)
+  public static boolean onSnapshotApplied(
+      AsyncDataClient client, Node header, int raftId, List<Integer> slots)
       throws TException, InterruptedException {
-    AtomicReference<Boolean> result = new AtomicReference<>(false);
+    AtomicReference<Boolean> result = new AtomicReference<>();
     GenericHandler<Boolean> handler = new GenericHandler<>(client.getNode(), result);
 
-    client.onSnapshotApplied(header, slots, handler);
-    return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
+    client.onSnapshotApplied(header, raftId, slots, handler);
+
+    synchronized (result) {
+      if (result.get() == null) {
+        result.wait();
+      }
+    }

Review comment:
       If an error occurs before the synchronization block, the result will never be notified.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
##########
@@ -48,22 +47,46 @@ public void apply(Log log) {
     try {
       logger.debug("MetaMember [{}] starts applying Log {}", metaGroupMember.getName(), log);
       if (log instanceof AddNodeLog) {
-        AddNodeLog addNodeLog = (AddNodeLog) log;
-        Node newNode = addNodeLog.getNewNode();
-        member.applyAddNode(newNode);
+        applyAddNodeLog((AddNodeLog) log);
       } else if (log instanceof PhysicalPlanLog) {
         applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
       } else if (log instanceof RemoveNodeLog) {
-        RemoveNodeLog removeNodeLog = ((RemoveNodeLog) log);
-        member.applyRemoveNode(removeNodeLog.getRemovedNode());
+        applyRemoveNodeLog((RemoveNodeLog) log);
+      } else if (log instanceof EmptyContentLog) {
+        // Do nothing
       } else {
         logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
       }
-    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
+    } catch (Exception e) {
       logger.debug("Exception occurred when executing {}", log, e);
       log.setException(e);
     } finally {
       log.setApplied(true);
     }
   }
+
+  private void applyAddNodeLog(AddNodeLog log)
+      throws ChangeMembershipException, InterruptedException {
+    if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+      logger.info("Ignore previous change membership log");
+      // ignore previous change membership log
+      return;
+    }
+    if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
+      metaGroupMember.sendLogToAllDataGroups(log);
+    }

Review comment:
       More than one node may consider themselves a leader, so is it safe if the log is sent more than once?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
##########
@@ -146,11 +149,12 @@ private void parseRemoved(String[] split) {
     }
     DataGroupMember member = memberFactory.create(partitionGroup, thisNode);
     member.setReadOnly();
-    removedMemberMap.put(partitionGroup.getHeader(), member);
+    // TODO CORRECT
+    removedMemberMap.put(new RaftNode(partitionGroup.getHeader(), 0), member);

Review comment:
       Maybe you can add the raftId into the file during serialization and deserialize them when start-up.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -919,10 +948,18 @@ protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
    * @param leaderCommitId leader commit id
    * @return true if leaderCommitId <= localAppliedId
    */
-  private boolean syncLocalApply(long leaderCommitId) {
+  public boolean syncLocalApply(long leaderCommitId) {
     long startTime = System.currentTimeMillis();
     long waitedTime = 0;
     long localAppliedId = 0;
+
+    // If the leader and follower logs differ too much, local query is not allowed
+    if (leaderCommitId - logManager.getMaxHaveAppliedCommitIndex() > 1000) {
+      logger.info(
+          "{}: The raft log of this member is too backward to provide service directly.", name);
+      return false;
+    }

Review comment:
       This should be configurable.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
##########
@@ -810,11 +810,11 @@ public void updatePlanIndexes(TsFileResource another) {
   }
 
   public boolean isPlanIndexOverlap(TsFileResource another) {
-    return another.maxPlanIndex >= this.minPlanIndex && another.minPlanIndex <= this.maxPlanIndex;
+    return another.maxPlanIndex > this.minPlanIndex && another.minPlanIndex < this.maxPlanIndex;
   }
 
   public boolean isPlanRangeCovers(TsFileResource another) {
-    return this.minPlanIndex <= another.minPlanIndex && another.maxPlanIndex <= this.maxPlanIndex;
+    return this.minPlanIndex < another.minPlanIndex && another.maxPlanIndex < this.maxPlanIndex;
   }

Review comment:
       Is this really necessary? This may cause many more redundant files to be loaded if two nodes generate similar files.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -2605,39 +2605,61 @@ private boolean loadTsFileByType(
       targetFile.getParentFile().mkdirs();
     }
     try {
-      FileUtils.moveFile(syncedTsFile, targetFile);
+      FileUtils.moveFile(tsFileToLoad, targetFile);
     } catch (IOException e) {
       logger.error(
           "File renaming failed when loading tsfile. Origin: {}, Target: {}",
-          syncedTsFile.getAbsolutePath(),
+          tsFileToLoad.getAbsolutePath(),
           targetFile.getAbsolutePath(),
           e);
       throw new LoadFileException(
           String.format(
               "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
-              syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
+              tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
     }
 
-    File syncedResourceFile =
-        fsFactory.getFile(syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    File resourceFileToLoad =
+        fsFactory.getFile(tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     File targetResourceFile =
         fsFactory.getFile(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     try {
-      FileUtils.moveFile(syncedResourceFile, targetResourceFile);
+      FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
     } catch (IOException e) {
       logger.error(
           "File renaming failed when loading .resource file. Origin: {}, Target: {}",
-          syncedResourceFile.getAbsolutePath(),
+          resourceFileToLoad.getAbsolutePath(),
           targetResourceFile.getAbsolutePath(),
           e);
       throw new LoadFileException(
           String.format(
               "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
-              syncedResourceFile.getAbsolutePath(),
+              resourceFileToLoad.getAbsolutePath(),
               targetResourceFile.getAbsolutePath(),
               e.getMessage()));
     }
 
+    File modFileToLoad =
+        fsFactory.getFile(tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    if (modFileToLoad.exists()) {
+      File targetModFile =
+          fsFactory.getFile(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);

Review comment:
       The variable is `modFileToLoad` while the suffix is RESOURCE_SUFFIX.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
##########
@@ -79,19 +84,39 @@
   List<PartitionGroup> getLocalGroups();
 
   /**
-   * @param header
+   * @param raftNode
    * @return the partition group starting from the header.
    */
-  PartitionGroup getHeaderGroup(Node header);
+  PartitionGroup getHeaderGroup(RaftNode raftNode);
+
+  PartitionGroup getHeaderGroup(Node node);
 
   ByteBuffer serialize();
 
-  void deserialize(ByteBuffer buffer);
+  /**
+   * Deserialize partition table and check whether the partition table in byte buffer is valid
+   *
+   * @param buffer
+   * @return true if the partition table is valid
+   */
+  boolean deserialize(ByteBuffer buffer);
 
   List<Node> getAllNodes();
 
   List<PartitionGroup> getGlobalGroups();
 
+  List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing);
+
+  /**
+   * Judge whether the data of slot is held by node
+   *
+   * @param node target node
+   */
+  boolean judgeHoldSlot(Node node, int slot);

Review comment:
       `Slot` should be avoided in the interface definition, as PartitionTable may not bind to implementation with slots.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
##########
@@ -173,23 +181,35 @@ private boolean pullSnapshot(int nodeIndex) throws InterruptedException, TExcept
 
   @Override
   public Void call() {
-    persistTask();
     request = new PullSnapshotRequest();
     request.setHeader(descriptor.getPreviousHolders().getHeader());
+    request.setRaftId(descriptor.getPreviousHolders().getId());
     request.setRequiredSlots(descriptor.getSlots());
     request.setRequireReadOnly(descriptor.isRequireReadOnly());
 
+    logger.info("{}: data migration starts.", newMember.getName());
     boolean finished = false;
-    int nodeIndex = -1;
+    int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()) - 1;
     while (!finished) {
       try {
         // sequentially pick up a node that may have this slot
         nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
+        long startTime = System.currentTimeMillis();
         finished = pullSnapshot(nodeIndex);
         if (!finished) {
+          if (logger.isDebugEnabled()) {
+            logger.debug(
+                "Cannot pull slot {} from {}, retry",
+                descriptor.getSlots(),
+                descriptor.getPreviousHolders().get(nodeIndex));
+          }
           Thread.sleep(
               ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
         }
+        logger.debug(
+            "{}: Data migration ends, cost {}ms",
+            newMember,
+            (System.currentTimeMillis() - startTime));

Review comment:
       This log should be put into an else block.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
##########
@@ -175,11 +219,21 @@ public PartitionGroup getHeaderGroup(Node node) {
     return ret;
   }
 
+  @Override
+  public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+    return getHeaderGroup(raftNode, this.nodeRing);
+  }
+
+  @Override
+  public PartitionGroup getHeaderGroup(Node node) {
+    return getHeaderGroup(new RaftNode(node, 0));
+  }

Review comment:
       Is this interface still useful anymore?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
##########
@@ -96,19 +96,21 @@ public boolean hasNextDataClient(boolean byTimestamp, long timestamp) {
         Long newReaderId = getReaderId(node, byTimestamp, timestamp);
         if (newReaderId != null) {
           logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
-          if (newReaderId != -1) {
+          if (newReaderId >= 0) {
             // register the node so the remote resources can be released
-            context.registerRemoteNode(node, partitionGroup.getHeader());
+            context.registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
             return true;
-          } else {
+          } else if (newReaderId == -1) {
             // the id being -1 means there is no satisfying data on the remote node, create an
             // empty reader to reduce further communication
             this.isNoClient = true;
             this.isNoData = true;
             return false;
+          } else {
+            logger.debug("change other client for better query performance.");

Review comment:
       Could you please further explain this?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
##########
@@ -442,9 +445,11 @@ private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> planGr
     }
     TSStatus status;
     if (errorCodePartitionGroups.isEmpty()) {
-      status = StatusUtils.OK;
       if (allRedirect) {
-        status = StatusUtils.getStatus(status, endPoint);
+        status = new TSStatus();
+        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+      } else {
+        status = StatusUtils.OK;
       }

Review comment:
       Where is the endPoint provided now?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
##########
@@ -70,4 +70,14 @@ void sendHeartbeatAsync(Node node) {
     // erase the sent partition table so it will not be sent in the next heartbeat
     request.unsetPartitionTableBytes();
   }
+
+  @Override
+  void startElection() {
+    //    if (localMetaMember.getThisNode().metaPort != 9003 &&
+    // localMetaMember.getThisNode().metaPort != 9005) {
+    //      return;
+    //    }
+    super.startElection();
+    localMetaMember.getAppendLogThreadPool().submit(() -> localMetaMember.processEmptyContentLog());

Review comment:
       What is this for?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
##########
@@ -66,9 +77,67 @@ public void start() throws StartupException {
   }
 
   @Override
+  public List<Pair<Node, NodeCharacter>> getMetaGroup() {
+    MetaGroupMember metaMember = getMetaGroupMember();
+    if (metaMember == null || metaMember.getPartitionTable() == null) {
+      return null;
+    }
+    List<Pair<Node, NodeCharacter>> res = new ArrayList<>();
+    for (Node node : metaMember.getPartitionTable().getAllNodes()) {
+      if (node.equals(metaMember.getLeader())) {
+        res.add(new Pair<>(node, NodeCharacter.LEADER));
+      } else {
+        res.add(new Pair<>(node, NodeCharacter.FOLLOWER));
+      }
+    }
+    return res;

Review comment:
       I would recommend you copy the current leader and nodes outside of the loop because they may change during the iteration so the caller may get two or more leaders.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -869,39 +929,116 @@ private boolean processAddNodeLocally(
       return true;
     }
 
+    AddNodeLog addNodeLog = new AddNodeLog();
     // node adding is serialized to reduce potential concurrency problem
     synchronized (logManager) {
-      AddNodeLog addNodeLog = new AddNodeLog();
+      // update partition table
+      PartitionTable table = new SlotPartitionTable(thisNode);
+      table.deserialize(partitionTable.serialize());
+      table.addNode(newNode);
+      ((SlotPartitionTable) table).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
+
+      addNodeLog.setPartitionTable(table.serialize());
       addNodeLog.setCurrLogTerm(getTerm().get());
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+      addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
 
-      addNodeLog.setNewNode(node);
+      addNodeLog.setNewNode(newNode);
 
       logManager.append(addNodeLog);
+    }
 
-      int retryTime = 1;
-      while (true) {
-        logger.info("Send the join request of {} to other nodes, retry time: {}", node, retryTime);
-        AppendLogResult result = sendLogToAllGroups(addNodeLog);
-        switch (result) {
-          case OK:
-            logger.info("Join request of {} is accepted", node);
-            commitLog(addNodeLog);
-
-            synchronized (partitionTable) {
-              response.setPartitionTableBytes(partitionTable.serialize());
-            }
-            response.setRespNum((int) Response.RESPONSE_AGREE);
-            logger.info("Sending join response of {}", node);
-            return true;
-          case TIME_OUT:
-            logger.info("Join request of {} timed out", node);
-            retryTime++;
-            continue;
-          case LEADERSHIP_STALE:
-          default:
-            return false;
-        }
+    int retryTime = 0;
+    while (true) {
+      logger.info(
+          "{}: Send the join request of {} to other nodes, retry time: {}",
+          name,
+          newNode,
+          retryTime);
+      AppendLogResult result = sendLogToFollowers(addNodeLog);
+      switch (result) {
+        case OK:
+          commitLog(addNodeLog);
+          logger.info("{}: Join request of {} is accepted", name, newNode);
+
+          synchronized (partitionTable) {
+            response.setPartitionTableBytes(partitionTable.serialize());
+          }
+          response.setRespNum((int) Response.RESPONSE_AGREE);
+          logger.info("{}: Sending join response of {}", name, newNode);
+          return true;
+        case TIME_OUT:
+          logger.debug("{}: log {} timed out, retrying...", name, addNodeLog);
+          try {
+            Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+          logger.info("{}: Join request of {} timed out", name, newNode);
+          retryTime++;
+          break;
+        case LEADERSHIP_STALE:
+        default:
+          return false;
+      }
+    }
+  }
+
+  /** Check if there has data migration due to previous change membership operation. */
+  private boolean waitDataMigrationEnd() throws InterruptedException, CheckConsistencyException {
+    // try 5 time
+    int retryTime = 0;
+    while (true) {
+      Map<PartitionGroup, Integer> res = collectAllPartitionMigrationStatus();
+      if (res != null && res.isEmpty()) {
+        return true;
+      }
+      if (++retryTime == 5) {
+        break;
+      }
+      Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+    }
+    return false;
+  }
+
+  /** Process empty log for leader to commit all previous log. */
+  public void processEmptyContentLog() {
+    if (character != NodeCharacter.LEADER) {
+      return;
+    }
+
+    Log log = new EmptyContentLog();
+    log.setCurrLogTerm(getTerm().get());
+    log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+    synchronized (logManager) {
+      logManager.append(log);
+    }

Review comment:
       The index assignment is outside of the synchronized block, so is it possible that several logs have conflict indices?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
##########
@@ -82,16 +85,15 @@ private void resetAggregateResults() {
                 .getAsyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
         aggrBuffers =
             SyncClientAdaptor.getGroupByResult(
-                client, header, executorId, curStartTime, curEndTime);
+                client, header, raftId, executorId, curStartTime, curEndTime);
       } else {
-        try (SyncDataClient syncDataClient =
+        SyncDataClient syncDataClient =
             metaGroupMember
                 .getClientProvider()
-                .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS())) {
-
-          aggrBuffers =
-              syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
-        }
+                .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
+        aggrBuffers =
+            syncDataClient.getGroupByResult(header, raftId, executorId, curStartTime, curEndTime);
+        ClientUtils.putBackSyncClient(syncDataClient);

Review comment:
       `putBackSyncClient` should be in a finally block to avoid resource leak.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -933,29 +1011,83 @@ public IBatchReader getSeriesBatchReader(
       Filter valueFilter,
       QueryContext context,
       DataGroupMember dataGroupMember,
-      boolean ascending)
+      boolean ascending,
+      Set<Integer> requiredSlots,
+      boolean syncLeader)
       throws StorageEngineException, QueryProcessException, IOException {
-    // pull the newest data
-    try {
-      dataGroupMember.syncLeaderWithConsistencyCheck(false);
-    } catch (CheckConsistencyException e) {
-      throw new StorageEngineException(e);
+    if (syncLeader) {
+      // pull the newest data
+      try {
+        dataGroupMember.syncLeaderWithConsistencyCheck(false);
+      } catch (CheckConsistencyException e) {
+        throw new StorageEngineException(e);
+      }
     }
 
-    SeriesReader seriesReader =
-        getSeriesReader(
-            path,
-            allSensors,
-            dataType,
-            timeFilter,
-            valueFilter,
-            context,
-            dataGroupMember.getHeader(),
-            ascending);
-    if (seriesReader.isEmpty()) {
-      return null;
+    // find the groups that should be queried due to data migration.
+    Map<PartitionGroup, Set<Integer>> holderSlotMap = dataGroupMember.getPreviousHolderSlotMap();
+
+    // If requiredSlots is not null, it means that this data group is the previous holder of
+    // required slots, which is no need to merge other resource,
+    if (requiredSlots == null && !holderSlotMap.isEmpty()) {
+      // merge remote reader and local reader
+      ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+
+      // add local reader
+      IPointReader seriesPointReader =
+          getSeriesPointReader(
+              path,
+              allSensors,
+              dataType,
+              timeFilter,
+              valueFilter,
+              context,
+              dataGroupMember,
+              ascending,
+              requiredSlots);
+      mergeReader.addReader(seriesPointReader, 0);

Review comment:
       Maybe the local reader should have a higher priority or be placed backwards since data in the new hold is newer than the old holder.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1501,11 +1814,20 @@ public TSStatus processNonPartitionedMetaPlan(PhysicalPlan plan) {
       Thread.currentThread().interrupt();
       logger.warn("Cannot get the status of all nodes", e);
     }
+
+    for (Node node : partitionTable.getAllNodes()) {
+      nodeStatus.putIfAbsent(node, 2);
+    }
+    for (Node node : allNodes) {
+      if (!partitionTable.getAllNodes().contains(node)) {
+        nodeStatus.put(node, 3);
+      }
+    }

Review comment:
       Better to replace these numbers with meaningful constants or enums.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
##########
@@ -48,22 +47,44 @@ public void apply(Log log) {
     try {
       logger.debug("MetaMember [{}] starts applying Log {}", metaGroupMember.getName(), log);
       if (log instanceof AddNodeLog) {
-        AddNodeLog addNodeLog = (AddNodeLog) log;
-        Node newNode = addNodeLog.getNewNode();
-        member.applyAddNode(newNode);
+        applyAddNodeLog((AddNodeLog) log);
       } else if (log instanceof PhysicalPlanLog) {
         applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
       } else if (log instanceof RemoveNodeLog) {
-        RemoveNodeLog removeNodeLog = ((RemoveNodeLog) log);
-        member.applyRemoveNode(removeNodeLog.getRemovedNode());
+        applyRemoveNodeLog((RemoveNodeLog) log);
+      } else if (log instanceof EmptyContentLog) {
+        // Do nothing
       } else {
         logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
       }
-    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
+    } catch (Exception e) {
       logger.debug("Exception occurred when executing {}", log, e);
       log.setException(e);
     } finally {
       log.setApplied(true);
     }
   }
+
+  private void applyAddNodeLog(AddNodeLog log) throws ChangeMembershipException {
+    if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+      logger.info("Ignore previous change membership log");
+      // ignore previous change membership log
+      return;
+    }
+    if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
+      metaGroupMember.sendLogToAllDataGroups(log);
+    }
+    member.applyAddNode(log);

Review comment:
       Meta followers may apply this meta log before they apply the corresponding data log, is it safe?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java
##########
@@ -254,7 +254,7 @@ public void testInstallMultiple()
       List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
       assertEquals(10, loadedFiles.size());
       for (int i = 0; i < 9; i++) {
-        assertEquals(i, loadedFiles.get(i).getMaxPlanIndex());
+        assertEquals(-1, loadedFiles.get(i).getMaxPlanIndex());

Review comment:
       This case tests snapshot catch-up, so the plan indices in the files should be reserved. Please check what went wrong.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1602,38 +1971,119 @@ private long processRemoveNodeLocally(Node node) throws LogExecutionException {
       return Response.RESPONSE_REJECT;
     }
 
+    RemoveNodeLog removeNodeLog = new RemoveNodeLog();
     // node removal must be serialized to reduce potential concurrency problem
     synchronized (logManager) {
-      RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+      // update partition table
+      PartitionTable table = new SlotPartitionTable(thisNode);
+      table.deserialize(partitionTable.serialize());

Review comment:
       It would be better to replace this with a copy constructor.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java
##########
@@ -301,6 +301,6 @@ public void testInstallPartial()
     for (int i = 0; i < 9; i++) {
       assertEquals(i, loadedFiles.get(i).getMaxPlanIndex());
     }
-    assertEquals(0, processor.getUnSequenceFileList().size());
+    assertEquals(1, processor.getUnSequenceFileList().size());

Review comment:
       Why should there be an unseq file?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
##########
@@ -98,4 +104,14 @@ public void testEmptyContentLog() throws UnknownLogTypeException {
     Log serialized = logParser.parse(byteBuffer);
     assertEquals(log, serialized);
   }
+
+  @Test
+  public void testLogPlan() throws IOException, IllegalPathException, UnknownLogTypeException {
+    AddNodeLog log = new AddNodeLog(TestUtils.seralizePartitionTable, TestUtils.getNode(0));
+    log.setMetaLogIndex(1);
+    LogPlan logPlan = new LogPlan(log.serialize());
+    ByteBuffer buffer = ByteBuffer.wrap(PlanSerializer.getInstance().serialize(logPlan));
+    PhysicalPlan plan = PhysicalPlan.Factory.create(buffer);
+    LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+  }

Review comment:
       Add an assertion to the test.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
##########
@@ -298,7 +299,7 @@ private void testNormal(boolean requiresReadOnly)
             loadedFiles.get(i).getMaxPlanIndex(),
             loadedFiles.get(i).getTsFile().getAbsolutePath());
       }
-      assertEquals(i, loadedFiles.get(i).getMaxPlanIndex());
+      assertEquals(-1, loadedFiles.get(i).getMaxPlanIndex());

Review comment:
       This case is not for data migration so the plan indices should not be changed.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
##########
@@ -1175,16 +1210,16 @@ public void testRemoteAddNode() {
       assertTrue(response.getCheckStatusResponse().isClusterNameEquals());
 
       // cannot add a node due to network failure
-      dummyResponse.set(Response.RESPONSE_NO_CONNECTION);
-      testMetaMember.setCharacter(LEADER);
-      result.set(null);
-      testMetaMember.setPartitionTable(partitionTable);
-      new Thread(
-              () -> {
-                await().atLeast(200, TimeUnit.MILLISECONDS);
-                dummyResponse.set(Response.RESPONSE_AGREE);
-              })
-          .start();
+      //      dummyResponse.set(Response.RESPONSE_NO_CONNECTION);
+      //      testMetaMember.setCharacter(LEADER);
+      //      result.set(null);
+      //      testMetaMember.setPartitionTable(partitionTable);
+      //      new Thread(
+      //              () -> {
+      //                await().atLeast(200, TimeUnit.MILLISECONDS);
+      //                dummyResponse.set(Response.RESPONSE_AGREE);
+      //              })
+      //          .start();

Review comment:
       Remove or resume this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org