You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/04/13 10:02:52 UTC

[incubator-iotdb] branch cluster_data_snapshot updated: finish sending and partial pulling

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_data_snapshot
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_data_snapshot by this push:
     new 174a35c  finish sending and partial pulling
174a35c is described below

commit 174a35ca962026827bfc7326a2a17f6f2076460c
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Apr 13 18:02:40 2020 +0800

    finish sending and partial pulling
---
 .../cluster/log/snapshot/PullSnapshotTask.java     |  94 +++++++++++-----
 .../log/snapshot/PullSnapshotTaskDescriptor.java   | 110 ++++++++++++++++++
 .../iotdb/cluster/partition/SlotManager.java       |   2 +-
 .../cluster/server/member/DataGroupMember.java     | 124 ++++++++++++++++-----
 .../apache/iotdb/cluster/utils/SerializeUtils.java |  11 ++
 .../engine/storagegroup/StorageGroupProcessor.java |  30 +++++
 6 files changed, 317 insertions(+), 54 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 2a077b2..64899fc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -19,9 +19,16 @@
 
 package org.apache.iotdb.cluster.log.snapshot;
 
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOError;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iotdb.cluster.client.DataClient;
@@ -33,6 +40,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.handlers.caller.PullSnapshotHandler;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.cluster.utils.SerializeUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,38 +53,37 @@ import org.slf4j.LoggerFactory;
 public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Integer,
     T>> {
 
+  public static final String TASK_SUFFIX = ".task";
   private static final Logger logger = LoggerFactory.getLogger(PullSnapshotTask.class);
 
-  private List<Integer> slots;
-  // the new member created by a node addition
+  private PullSnapshotTaskDescriptor descriptor;
   private DataGroupMember newMember;
-  // the nodes the may hold the target slot
-  private List<Node> previousHolders;
-  // the header of the old members
-  private Node header;
-  // set to true if the previous holder has been removed from the cluster.
-  // This will make the previous holder read-only so that different new
-  // replicas can pull the same snapshot.
-  private boolean requireReadOnly;
 
   private PullSnapshotRequest request;
   private SnapshotFactory snapshotFactory;
 
-  public PullSnapshotTask(Node header, List<Integer> slots,
-      DataGroupMember newMember, List<Node> previousHolders, SnapshotFactory snapshotFactory,
-      boolean requireReadOnly) {
-    this.header = header;
-    this.slots = slots;
+  private File snapshotSave;
+
+  /**
+   *
+   * @param descriptor
+   * @param newMember
+   * @param snapshotFactory
+   * @param snapshotSave if the task is resumed from a disk file, this should that file,
+   *                     otherwise it should bu null
+   */
+  public PullSnapshotTask(PullSnapshotTaskDescriptor descriptor,
+      DataGroupMember newMember, SnapshotFactory snapshotFactory, File snapshotSave) {
+    this.descriptor = descriptor;
     this.newMember = newMember;
-    this.previousHolders = previousHolders;
     this.snapshotFactory = snapshotFactory;
-    this.requireReadOnly = requireReadOnly;
+    this.snapshotSave = snapshotSave;
   }
 
   private boolean pullSnapshot(AtomicReference<Map<Integer, T>> snapshotRef, int nodeIndex)
       throws InterruptedException, TException {
-    Node node = previousHolders.get(nodeIndex);
-    logger.debug("Pulling {} snapshots from {}", slots.size(), node);
+    Node node = descriptor.getPreviousHolders().get(nodeIndex);
+    logger.debug("Pulling {} snapshots from {}", descriptor.getSlots().size(), node);
 
     DataClient client =
         (DataClient) newMember.connectNode(node);
@@ -86,13 +93,13 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
     } else {
       synchronized (snapshotRef) {
         client.pullSnapshot(request, new PullSnapshotHandler<>(snapshotRef,
-            node, slots, snapshotFactory));
+            node, descriptor.getSlots(), snapshotFactory));
         snapshotRef.wait(RaftServer.connectionTimeoutInMS);
       }
       Map<Integer, T> result = snapshotRef.get();
       if (result != null) {
         if (logger.isInfoEnabled()) {
-          logger.info("Received a snapshot {} from {}", result, previousHolders.get(nodeIndex));
+          logger.info("Received a snapshot {} from {}", result, descriptor.getPreviousHolders().get(nodeIndex));
         }
         for (Entry<Integer, T> entry : result.entrySet()) {
           try {
@@ -113,26 +120,59 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
 
   @Override
   public Map<Integer, T> call() {
+    persistTask();
     request = new PullSnapshotRequest();
-    request.setHeader(header);
-    request.setRequiredSlots(slots);
-    request.setRequireReadOnly(requireReadOnly);
+    request.setHeader(descriptor.getPreviousHolders().getHeader());
+    request.setRequiredSlots(descriptor.getSlots());
+    request.setRequireReadOnly(descriptor.isRequireReadOnly());
     AtomicReference<Map<Integer, T>> snapshotRef = new AtomicReference<>();
     boolean finished = false;
     int nodeIndex = -1;
     while (!finished) {
       try {
         // sequentially pick up a node that may have this slot
-        nodeIndex = (nodeIndex + 1) % previousHolders.size();
+        nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
         finished = pullSnapshot(snapshotRef, nodeIndex);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.error("Unexpected interruption when pulling slot {}", slots, e);
+        logger.error("Unexpected interruption when pulling slot {}", descriptor.getSlots(), e);
         finished = true;
       } catch (TException e) {
-        logger.debug("Cannot pull slot {} from {}, retry", slots, previousHolders.get(nodeIndex), e);
+        logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
+            descriptor.getPreviousHolders().get(nodeIndex), e);
       }
     }
+    removeTask();
     return snapshotRef.get();
   }
+
+  private void persistTask() {
+    if (snapshotSave != null) {
+      // the task is resumed from disk, do not persist it again
+      return;
+    }
+
+    Random random = new Random();
+    while (true) {
+      String saveName = System.currentTimeMillis() + "_" + random.nextLong() + ".task";
+      snapshotSave = new File(newMember.getPullSnapshotTaskDir(), saveName);
+      if (snapshotSave.exists()) {
+        continue;
+      }
+      snapshotSave.getParentFile().mkdirs();
+      break;
+    }
+
+    try (DataOutputStream dataOutputStream =
+        new DataOutputStream(new BufferedOutputStream(new FileOutputStream(snapshotSave)))) {
+      descriptor.serialize(dataOutputStream);
+    } catch (IOException e) {
+      logger.error("Cannot save the pulling task: pull {} from {}", descriptor.getSlots(),
+          descriptor.getPreviousHolders(), e);
+    }
+  }
+
+  private void removeTask() {
+    snapshotSave.delete();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
new file mode 100644
index 0000000..d7d1be0
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.snapshot;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.utils.SerializeUtils;
+
+/**
+ * PullSnapshotTaskDescriptor describes a pull-snapshot-task with the slots to pull, the previous
+ * owners and does this pulling require the provider to become read-only. So the task can be
+ * resumed when system crashes.
+ */
+public class PullSnapshotTaskDescriptor {
+  private PartitionGroup previousHolders;
+  private List<Integer> slots;
+
+  // set to true if the previous holder has been removed from the cluster.
+  // This will make the previous holder read-only so that different new
+  // replicas can pull the same snapshot.
+  private boolean requireReadOnly;
+
+  public PullSnapshotTaskDescriptor() {
+  }
+
+  public PullSnapshotTaskDescriptor(PartitionGroup previousOwners,
+      List<Integer> slots, boolean requireReadOnly) {
+    this.previousHolders = previousOwners;
+    this.slots = slots;
+    this.requireReadOnly = requireReadOnly;
+  }
+
+  public PartitionGroup getPreviousHolders() {
+    return previousHolders;
+  }
+
+  public void setPreviousHolders(PartitionGroup previousHolders) {
+    this.previousHolders = previousHolders;
+  }
+
+  public List<Integer> getSlots() {
+    return slots;
+  }
+
+  public void setSlots(List<Integer> slots) {
+    this.slots = slots;
+  }
+
+  public boolean isRequireReadOnly() {
+    return requireReadOnly;
+  }
+
+  public void setRequireReadOnly(boolean requireReadOnly) {
+    this.requireReadOnly = requireReadOnly;
+  }
+
+  public void serialize(DataOutputStream dataOutputStream) throws IOException {
+    dataOutputStream.writeInt(slots.size());
+    for (Integer slot : slots) {
+      dataOutputStream.writeInt(slot);
+    }
+
+    dataOutputStream.writeInt(previousHolders.size());
+    for (Node previousHolder : previousHolders) {
+      SerializeUtils.serialize(previousHolder, dataOutputStream);
+    }
+
+    dataOutputStream.writeBoolean(requireReadOnly);
+  }
+
+  public void deserialize(DataInputStream dataInputStream) throws IOException {
+    int slotSize = dataInputStream.readInt();
+    slots = new ArrayList<>(slotSize);
+    for (int i = 0; i < slotSize; i++) {
+      slots.add(dataInputStream.readInt());
+    }
+
+    int holderSize = dataInputStream.readInt();
+    previousHolders = new PartitionGroup();
+    for (int i = 0; i < holderSize; i++) {
+      Node node = new Node();
+      SerializeUtils.deserialize(node, dataInputStream);
+      previousHolders.add(node);
+    }
+
+    requireReadOnly = dataInputStream.readBoolean();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotManager.java
index a2123a2..f172782 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotManager.java
@@ -144,7 +144,7 @@ public class SlotManager {
   }
 
 
-  enum SlotStatus {
+  public enum SlotStatus {
     // the slot has pulled data or does not belong to this member
     NULL,
     // the slot is pulling data and writes into it should be blocked and reads of it should merge
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 8237dfc..158361f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -19,23 +19,26 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iotdb.cluster.RemoteTsFileResource;
 import org.apache.iotdb.cluster.client.ClientPool;
@@ -53,10 +56,11 @@ import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
-import org.apache.iotdb.cluster.log.snapshot.RemoteFileSnapshot;
+import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTaskDescriptor;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.SlotManager;
+import org.apache.iotdb.cluster.partition.SlotManager.SlotStatus;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.query.filter.SlotTsFileFilter;
 import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
@@ -182,7 +186,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
   }
 
   /**
-   * Start heartbeat, catch-up and pull snapshot services.
+   * Start heartbeat, catch-up, pull snapshot services and start all unfinished pull-snapshot-tasks.
    * Calling the method twice does not induce side effects.
    * @throws TTransportException
    */
@@ -194,6 +198,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
     super.start();
     heartBeatService.submit(new DataHeartbeatThread(this));
     pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    resumePullSnapshotTasks();
   }
 
   /**
@@ -655,16 +660,29 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       setReadOnly();
     }
 
+    List<Integer> requiredSlots = request.getRequiredSlots();
+    for (Integer requiredSlot : requiredSlots) {
+      // wait if the data of the slot is in another node
+      slotManager.waitSlot(requiredSlot);
+    }
+    logger.debug("{}: {} slots are requested", name, requiredSlots.size());
+
+    // If the logs between [currCommitLogIndex, currLastLogIndex] are committed after the
+    // snapshot is generated, they will be invisible to the new slot owner and thus lost forever
+    long currLastLogIndex = logManager.getLastLogIndex();
+    logger.info("{}: Waiting for logs to commit before snapshot, {}/{}", name,
+        logManager.getCommitLogIndex(), currLastLogIndex);
+    while (logManager.getCommitLogIndex() < currLastLogIndex) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        logger.warn("{}: Unexpected interruption when waiting for logs to commit", name, e);
+      }
+    }
+
     // this synchronized should work with the one in AppendEntry when a log is going to commit,
     // which may prevent the newly arrived data from being invisible to the new header.
     synchronized (logManager) {
-      List<Integer> requiredSlots = request.getRequiredSlots();
-      for (Integer requiredSlot : requiredSlots) {
-        // wait if the data of the slot is in another node
-        slotManager.waitSlot(requiredSlot);
-      }
-      logger.debug("{}: {} slots are requested", name, requiredSlots.size());
-
       PullSnapshotResp resp = new PullSnapshotResp();
       Map<Integer, ByteBuffer> resultMap = new HashMap<>();
       try {
@@ -713,7 +731,10 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
         Node node = entry.getKey();
         List<Integer> nodeSlots = entry.getValue();
-        pullFileSnapshot(metaGroupMember.getPartitionTable().getHeaderGroup(node),  nodeSlots, false);
+        PullSnapshotTaskDescriptor taskDescriptor =
+            new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable().getHeaderGroup(node),
+            nodeSlots, false);
+        pullFileSnapshot(taskDescriptor, null);
       }
     }
   }
@@ -721,24 +742,73 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
   /**
    * Pull FileSnapshots (timeseries schemas and lists of TsFiles) of "nodeSlots" from one of the
    * "prevHolders".
-   * The actual pulling will be performed in a separate thread, and placeholders
-   * (RemoteFileSnapshots) will be placed in those slots of the logManager to prevent the
-   * logManager from taking snapshots before the data has been pulled.
-   * @param prevHolders
-   * @param nodeSlots
-   * @param requireReadOnly set to true if the previous holder has been removed from the cluster.
-   *                       This will make the previous holder read-only so that different new
-   *                        replicas can pull the same snapshot.
+   * The actual pulling will be performed in a separate thread.
+   * @param descriptor
+   * @param snapshotSave set to the corresponding disk file if the task is resumed from disk, or
+   *                     set ot null otherwise
+   */
+  private void pullFileSnapshot(PullSnapshotTaskDescriptor descriptor, File snapshotSave) {
+    Iterator<Integer> iterator = descriptor.getSlots().iterator();
+    while (iterator.hasNext()) {
+      Integer nodeSlot = iterator.next();
+      SlotStatus status = slotManager.getStatus(nodeSlot);
+      if (status != SlotStatus.NULL) {
+        // the pulling may already be issued during restart, skip it in that case
+        iterator.remove();
+      } else {
+        // mark the slot as pulling to control reads and writes of the pulling slot
+        slotManager.setToPulling(nodeSlot, descriptor.getPreviousHolders().getHeader());
+      }
+    }
+    if (descriptor.getSlots().isEmpty()) {
+      return;
+    }
+
+    pullSnapshotService.submit(new PullSnapshotTask(descriptor, this, FileSnapshot::new, null));
+  }
+
+  /**
+   * Restart all unfinished pull-snapshot-tasks of the member.
    */
-  private void pullFileSnapshot(PartitionGroup prevHolders, List<Integer> nodeSlots, boolean requireReadOnly) {
-    Future<Map<Integer, FileSnapshot>> snapshotFuture =
-        pullSnapshotService.submit(new PullSnapshotTask(prevHolders.getHeader(), nodeSlots, this,
-            prevHolders, FileSnapshot::new, requireReadOnly));
-    for (int slot : nodeSlots) {
-      logManager.setSnapshot(new RemoteFileSnapshot(snapshotFuture, slot), slot);
+  public void resumePullSnapshotTasks() {
+    File snapshotTaskDir = new File(getPullSnapshotTaskDir());
+    if (!snapshotTaskDir.exists()) {
+      return;
+    }
+
+    File[] files = snapshotTaskDir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        if (file.getName().endsWith(PullSnapshotTask.TASK_SUFFIX)) {
+          try (DataInputStream dataInputStream =
+              new DataInputStream(new BufferedInputStream(new FileInputStream(file)))) {
+            PullSnapshotTaskDescriptor descriptor = new PullSnapshotTaskDescriptor();
+            descriptor.deserialize(dataInputStream);
+            pullFileSnapshot(descriptor, file);
+          } catch (IOException e) {
+            logger.error("Cannot resume pull-snapshot-task in file {}", file, e);
+            file.delete();
+          }
+        }
+      }
     }
   }
 
+  /**
+   *
+   * @return a directory that stores the information of ongoing pulling snapshot tasks.
+   */
+  public String getPullSnapshotTaskDir() {
+    return getMemberDir() + "snapshot_task" + File.separator;
+  }
+
+  /**
+   *
+   * @return the path of the directory that is provided exclusively for the member.
+   */
+  public String getMemberDir() {
+    return "raft" + File.separator + getHeader().nodeIdentifier + File.separator;
+  }
 
   public MetaGroupMember getMetaGroupMember() {
     return metaGroupMember;
@@ -1198,7 +1268,9 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       List<Integer> slotsToPull = removalResult.getNewSlotOwners().get(getHeader());
       if (slotsToPull != null) {
         // pull the slots that should be taken over
-        pullFileSnapshot(removalResult.getRemovedGroup(), slotsToPull, true);
+        PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor(removalResult.getRemovedGroup(),
+            slotsToPull, true);
+        pullFileSnapshot(taskDescriptor, null);
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/SerializeUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/SerializeUtils.java
index e26e0e9..274c49a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/SerializeUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/SerializeUtils.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.utils;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -100,6 +101,16 @@ public class SerializeUtils {
     node.setDataPort(buffer.getInt());
   }
 
+  public static void deserialize(Node node, DataInputStream stream) throws IOException {
+    int ipLength = stream.readInt();
+    byte[] ipBytes = new byte[ipLength];
+    stream.read(ipBytes);
+    node.setIp(new String(ipBytes));
+    node.setMetaPort(stream.readInt());
+    node.setNodeIdentifier(stream.readInt());
+    node.setDataPort(stream.readInt());
+  }
+
   public static void serializeBatchData(BatchData batchData, DataOutputStream outputStream) {
     try {
       int length = batchData.length();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 06fd039..a85aa58 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2045,7 +2045,37 @@ public class StorageGroupProcessor {
     return storageGroupName;
   }
 
+  /**
+   * Check if the data of "tsFileResource" all exist locally by comparing the historical versions
+   * in the partition of "partitionNumber". This is available only when the IoTDB which generated
+   * "tsFileResource" has the same close file policy as the local one.
+   * If one of the version in "tsFileResource" equals to a version of a working file, false is
+   * also returned because "tsFileResource" may have unwritten data of that file.
+   * @param tsFileResource
+   * @param partitionNum
+   * @return true if the historicalVersions of "tsFileResource" is a subset of
+   * partitionDirectFileVersions, or false if it is not a subset and it does not contain any
+   * version of a working file
+   */
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
+    // consider the case: The local node crashes when it is writing TsFile no.5.
+    // when it restarts, the leader has proceeded to no.6. When the leader sends no.5 to this
+    // node, the file should be accepted as local no.5 is not closed which means there may be
+    // unreceived data in no.5
+    // So if the incoming file contains the version of an unclosed file, it should be accepted
+    for (TsFileProcessor workSequenceTsFileProcessor : getWorkSequenceTsFileProcessors()) {
+      long workingFileVersion = workSequenceTsFileProcessor.getTsFileResource().getMaxVersion();
+      if (tsFileResource.getHistoricalVersions().contains(workingFileVersion)) {
+        return false;
+      }
+    }
+    for (TsFileProcessor workUnsequenceTsFileProcessor : getWorkUnsequenceTsFileProcessor()) {
+      long workingFileVersion = workUnsequenceTsFileProcessor.getTsFileResource().getMaxVersion();
+      if (tsFileResource.getHistoricalVersions().contains(workingFileVersion)) {
+        return false;
+      }
+    }
+
     return partitionDirectFileVersions.getOrDefault(partitionNum, Collections.emptySet())
         .containsAll(tsFileResource.getHistoricalVersions());
   }