You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/04/13 10:02:52 UTC
[incubator-iotdb] branch cluster_data_snapshot updated: finish
sending and partial pulling
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_data_snapshot
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_data_snapshot by this push:
new 174a35c finish sending and partial pulling
174a35c is described below
commit 174a35ca962026827bfc7326a2a17f6f2076460c
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Apr 13 18:02:40 2020 +0800
finish sending and partial pulling
---
.../cluster/log/snapshot/PullSnapshotTask.java | 94 +++++++++++-----
.../log/snapshot/PullSnapshotTaskDescriptor.java | 110 ++++++++++++++++++
.../iotdb/cluster/partition/SlotManager.java | 2 +-
.../cluster/server/member/DataGroupMember.java | 124 ++++++++++++++++-----
.../apache/iotdb/cluster/utils/SerializeUtils.java | 11 ++
.../engine/storagegroup/StorageGroupProcessor.java | 30 +++++
6 files changed, 317 insertions(+), 54 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 2a077b2..64899fc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -19,9 +19,16 @@
package org.apache.iotdb.cluster.log.snapshot;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOError;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.cluster.client.DataClient;
@@ -33,6 +40,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.handlers.caller.PullSnapshotHandler;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.cluster.utils.SerializeUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,38 +53,37 @@ import org.slf4j.LoggerFactory;
public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Integer,
T>> {
+ public static final String TASK_SUFFIX = ".task";
private static final Logger logger = LoggerFactory.getLogger(PullSnapshotTask.class);
- private List<Integer> slots;
- // the new member created by a node addition
+ private PullSnapshotTaskDescriptor descriptor;
private DataGroupMember newMember;
- // the nodes the may hold the target slot
- private List<Node> previousHolders;
- // the header of the old members
- private Node header;
- // set to true if the previous holder has been removed from the cluster.
- // This will make the previous holder read-only so that different new
- // replicas can pull the same snapshot.
- private boolean requireReadOnly;
private PullSnapshotRequest request;
private SnapshotFactory snapshotFactory;
- public PullSnapshotTask(Node header, List<Integer> slots,
- DataGroupMember newMember, List<Node> previousHolders, SnapshotFactory snapshotFactory,
- boolean requireReadOnly) {
- this.header = header;
- this.slots = slots;
+ private File snapshotSave;
+
+ /**
+ *
+ * @param descriptor
+ * @param newMember
+ * @param snapshotFactory
+ * @param snapshotSave if the task is resumed from a disk file, this should that file,
+ * otherwise it should bu null
+ */
+ public PullSnapshotTask(PullSnapshotTaskDescriptor descriptor,
+ DataGroupMember newMember, SnapshotFactory snapshotFactory, File snapshotSave) {
+ this.descriptor = descriptor;
this.newMember = newMember;
- this.previousHolders = previousHolders;
this.snapshotFactory = snapshotFactory;
- this.requireReadOnly = requireReadOnly;
+ this.snapshotSave = snapshotSave;
}
private boolean pullSnapshot(AtomicReference<Map<Integer, T>> snapshotRef, int nodeIndex)
throws InterruptedException, TException {
- Node node = previousHolders.get(nodeIndex);
- logger.debug("Pulling {} snapshots from {}", slots.size(), node);
+ Node node = descriptor.getPreviousHolders().get(nodeIndex);
+ logger.debug("Pulling {} snapshots from {}", descriptor.getSlots().size(), node);
DataClient client =
(DataClient) newMember.connectNode(node);
@@ -86,13 +93,13 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
} else {
synchronized (snapshotRef) {
client.pullSnapshot(request, new PullSnapshotHandler<>(snapshotRef,
- node, slots, snapshotFactory));
+ node, descriptor.getSlots(), snapshotFactory));
snapshotRef.wait(RaftServer.connectionTimeoutInMS);
}
Map<Integer, T> result = snapshotRef.get();
if (result != null) {
if (logger.isInfoEnabled()) {
- logger.info("Received a snapshot {} from {}", result, previousHolders.get(nodeIndex));
+ logger.info("Received a snapshot {} from {}", result, descriptor.getPreviousHolders().get(nodeIndex));
}
for (Entry<Integer, T> entry : result.entrySet()) {
try {
@@ -113,26 +120,59 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
@Override
public Map<Integer, T> call() {
+ persistTask();
request = new PullSnapshotRequest();
- request.setHeader(header);
- request.setRequiredSlots(slots);
- request.setRequireReadOnly(requireReadOnly);
+ request.setHeader(descriptor.getPreviousHolders().getHeader());
+ request.setRequiredSlots(descriptor.getSlots());
+ request.setRequireReadOnly(descriptor.isRequireReadOnly());
AtomicReference<Map<Integer, T>> snapshotRef = new AtomicReference<>();
boolean finished = false;
int nodeIndex = -1;
while (!finished) {
try {
// sequentially pick up a node that may have this slot
- nodeIndex = (nodeIndex + 1) % previousHolders.size();
+ nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
finished = pullSnapshot(snapshotRef, nodeIndex);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.error("Unexpected interruption when pulling slot {}", slots, e);
+ logger.error("Unexpected interruption when pulling slot {}", descriptor.getSlots(), e);
finished = true;
} catch (TException e) {
- logger.debug("Cannot pull slot {} from {}, retry", slots, previousHolders.get(nodeIndex), e);
+ logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
+ descriptor.getPreviousHolders().get(nodeIndex), e);
}
}
+ removeTask();
return snapshotRef.get();
}
+
+ private void persistTask() {
+ if (snapshotSave != null) {
+ // the task is resumed from disk, do not persist it again
+ return;
+ }
+
+ Random random = new Random();
+ while (true) {
+ String saveName = System.currentTimeMillis() + "_" + random.nextLong() + ".task";
+ snapshotSave = new File(newMember.getPullSnapshotTaskDir(), saveName);
+ if (snapshotSave.exists()) {
+ continue;
+ }
+ snapshotSave.getParentFile().mkdirs();
+ break;
+ }
+
+ try (DataOutputStream dataOutputStream =
+ new DataOutputStream(new BufferedOutputStream(new FileOutputStream(snapshotSave)))) {
+ descriptor.serialize(dataOutputStream);
+ } catch (IOException e) {
+ logger.error("Cannot save the pulling task: pull {} from {}", descriptor.getSlots(),
+ descriptor.getPreviousHolders(), e);
+ }
+ }
+
+ private void removeTask() {
+ snapshotSave.delete();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
new file mode 100644
index 0000000..d7d1be0
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.snapshot;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.utils.SerializeUtils;
+
+/**
+ * PullSnapshotTaskDescriptor describes a pull-snapshot-task with the slots to pull, the previous
+ * owners and does this pulling require the provider to become read-only. So the task can be
+ * resumed when system crashes.
+ */
+public class PullSnapshotTaskDescriptor {
+ private PartitionGroup previousHolders;
+ private List<Integer> slots;
+
+ // set to true if the previous holder has been removed from the cluster.
+ // This will make the previous holder read-only so that different new
+ // replicas can pull the same snapshot.
+ private boolean requireReadOnly;
+
+ public PullSnapshotTaskDescriptor() {
+ }
+
+ public PullSnapshotTaskDescriptor(PartitionGroup previousOwners,
+ List<Integer> slots, boolean requireReadOnly) {
+ this.previousHolders = previousOwners;
+ this.slots = slots;
+ this.requireReadOnly = requireReadOnly;
+ }
+
+ public PartitionGroup getPreviousHolders() {
+ return previousHolders;
+ }
+
+ public void setPreviousHolders(PartitionGroup previousHolders) {
+ this.previousHolders = previousHolders;
+ }
+
+ public List<Integer> getSlots() {
+ return slots;
+ }
+
+ public void setSlots(List<Integer> slots) {
+ this.slots = slots;
+ }
+
+ public boolean isRequireReadOnly() {
+ return requireReadOnly;
+ }
+
+ public void setRequireReadOnly(boolean requireReadOnly) {
+ this.requireReadOnly = requireReadOnly;
+ }
+
+ public void serialize(DataOutputStream dataOutputStream) throws IOException {
+ dataOutputStream.writeInt(slots.size());
+ for (Integer slot : slots) {
+ dataOutputStream.writeInt(slot);
+ }
+
+ dataOutputStream.writeInt(previousHolders.size());
+ for (Node previousHolder : previousHolders) {
+ SerializeUtils.serialize(previousHolder, dataOutputStream);
+ }
+
+ dataOutputStream.writeBoolean(requireReadOnly);
+ }
+
+ public void deserialize(DataInputStream dataInputStream) throws IOException {
+ int slotSize = dataInputStream.readInt();
+ slots = new ArrayList<>(slotSize);
+ for (int i = 0; i < slotSize; i++) {
+ slots.add(dataInputStream.readInt());
+ }
+
+ int holderSize = dataInputStream.readInt();
+ previousHolders = new PartitionGroup();
+ for (int i = 0; i < holderSize; i++) {
+ Node node = new Node();
+ SerializeUtils.deserialize(node, dataInputStream);
+ previousHolders.add(node);
+ }
+
+ requireReadOnly = dataInputStream.readBoolean();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotManager.java
index a2123a2..f172782 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotManager.java
@@ -144,7 +144,7 @@ public class SlotManager {
}
- enum SlotStatus {
+ public enum SlotStatus {
// the slot has pulled data or does not belong to this member
NULL,
// the slot is pulling data and writes into it should be blocked and reads of it should merge
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 8237dfc..158361f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -19,23 +19,26 @@
package org.apache.iotdb.cluster.server.member;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.cluster.RemoteTsFileResource;
import org.apache.iotdb.cluster.client.ClientPool;
@@ -53,10 +56,11 @@ import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
-import org.apache.iotdb.cluster.log.snapshot.RemoteFileSnapshot;
+import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTaskDescriptor;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.SlotManager;
+import org.apache.iotdb.cluster.partition.SlotManager.SlotStatus;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.query.filter.SlotTsFileFilter;
import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
@@ -182,7 +186,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
}
/**
- * Start heartbeat, catch-up and pull snapshot services.
+ * Start heartbeat, catch-up, pull snapshot services and start all unfinished pull-snapshot-tasks.
* Calling the method twice does not induce side effects.
* @throws TTransportException
*/
@@ -194,6 +198,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
super.start();
heartBeatService.submit(new DataHeartbeatThread(this));
pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ resumePullSnapshotTasks();
}
/**
@@ -655,16 +660,29 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
setReadOnly();
}
+ List<Integer> requiredSlots = request.getRequiredSlots();
+ for (Integer requiredSlot : requiredSlots) {
+ // wait if the data of the slot is in another node
+ slotManager.waitSlot(requiredSlot);
+ }
+ logger.debug("{}: {} slots are requested", name, requiredSlots.size());
+
+ // If the logs between [currCommitLogIndex, currLastLogIndex] are committed after the
+ // snapshot is generated, they will be invisible to the new slot owner and thus lost forever
+ long currLastLogIndex = logManager.getLastLogIndex();
+ logger.info("{}: Waiting for logs to commit before snapshot, {}/{}", name,
+ logManager.getCommitLogIndex(), currLastLogIndex);
+ while (logManager.getCommitLogIndex() < currLastLogIndex) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ logger.warn("{}: Unexpected interruption when waiting for logs to commit", name, e);
+ }
+ }
+
// this synchronized should work with the one in AppendEntry when a log is going to commit,
// which may prevent the newly arrived data from being invisible to the new header.
synchronized (logManager) {
- List<Integer> requiredSlots = request.getRequiredSlots();
- for (Integer requiredSlot : requiredSlots) {
- // wait if the data of the slot is in another node
- slotManager.waitSlot(requiredSlot);
- }
- logger.debug("{}: {} slots are requested", name, requiredSlots.size());
-
PullSnapshotResp resp = new PullSnapshotResp();
Map<Integer, ByteBuffer> resultMap = new HashMap<>();
try {
@@ -713,7 +731,10 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
Node node = entry.getKey();
List<Integer> nodeSlots = entry.getValue();
- pullFileSnapshot(metaGroupMember.getPartitionTable().getHeaderGroup(node), nodeSlots, false);
+ PullSnapshotTaskDescriptor taskDescriptor =
+ new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable().getHeaderGroup(node),
+ nodeSlots, false);
+ pullFileSnapshot(taskDescriptor, null);
}
}
}
@@ -721,24 +742,73 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
/**
* Pull FileSnapshots (timeseries schemas and lists of TsFiles) of "nodeSlots" from one of the
* "prevHolders".
- * The actual pulling will be performed in a separate thread, and placeholders
- * (RemoteFileSnapshots) will be placed in those slots of the logManager to prevent the
- * logManager from taking snapshots before the data has been pulled.
- * @param prevHolders
- * @param nodeSlots
- * @param requireReadOnly set to true if the previous holder has been removed from the cluster.
- * This will make the previous holder read-only so that different new
- * replicas can pull the same snapshot.
+ * The actual pulling will be performed in a separate thread.
+ * @param descriptor
+ * @param snapshotSave set to the corresponding disk file if the task is resumed from disk, or
+ * set ot null otherwise
+ */
+ private void pullFileSnapshot(PullSnapshotTaskDescriptor descriptor, File snapshotSave) {
+ Iterator<Integer> iterator = descriptor.getSlots().iterator();
+ while (iterator.hasNext()) {
+ Integer nodeSlot = iterator.next();
+ SlotStatus status = slotManager.getStatus(nodeSlot);
+ if (status != SlotStatus.NULL) {
+ // the pulling may already be issued during restart, skip it in that case
+ iterator.remove();
+ } else {
+ // mark the slot as pulling to control reads and writes of the pulling slot
+ slotManager.setToPulling(nodeSlot, descriptor.getPreviousHolders().getHeader());
+ }
+ }
+ if (descriptor.getSlots().isEmpty()) {
+ return;
+ }
+
+ pullSnapshotService.submit(new PullSnapshotTask(descriptor, this, FileSnapshot::new, null));
+ }
+
+ /**
+ * Restart all unfinished pull-snapshot-tasks of the member.
*/
- private void pullFileSnapshot(PartitionGroup prevHolders, List<Integer> nodeSlots, boolean requireReadOnly) {
- Future<Map<Integer, FileSnapshot>> snapshotFuture =
- pullSnapshotService.submit(new PullSnapshotTask(prevHolders.getHeader(), nodeSlots, this,
- prevHolders, FileSnapshot::new, requireReadOnly));
- for (int slot : nodeSlots) {
- logManager.setSnapshot(new RemoteFileSnapshot(snapshotFuture, slot), slot);
+ public void resumePullSnapshotTasks() {
+ File snapshotTaskDir = new File(getPullSnapshotTaskDir());
+ if (!snapshotTaskDir.exists()) {
+ return;
+ }
+
+ File[] files = snapshotTaskDir.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.getName().endsWith(PullSnapshotTask.TASK_SUFFIX)) {
+ try (DataInputStream dataInputStream =
+ new DataInputStream(new BufferedInputStream(new FileInputStream(file)))) {
+ PullSnapshotTaskDescriptor descriptor = new PullSnapshotTaskDescriptor();
+ descriptor.deserialize(dataInputStream);
+ pullFileSnapshot(descriptor, file);
+ } catch (IOException e) {
+ logger.error("Cannot resume pull-snapshot-task in file {}", file, e);
+ file.delete();
+ }
+ }
+ }
}
}
+ /**
+ *
+ * @return a directory that stores the information of ongoing pulling snapshot tasks.
+ */
+ public String getPullSnapshotTaskDir() {
+ return getMemberDir() + "snapshot_task" + File.separator;
+ }
+
+ /**
+ *
+ * @return the path of the directory that is provided exclusively for the member.
+ */
+ public String getMemberDir() {
+ return "raft" + File.separator + getHeader().nodeIdentifier + File.separator;
+ }
public MetaGroupMember getMetaGroupMember() {
return metaGroupMember;
@@ -1198,7 +1268,9 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
List<Integer> slotsToPull = removalResult.getNewSlotOwners().get(getHeader());
if (slotsToPull != null) {
// pull the slots that should be taken over
- pullFileSnapshot(removalResult.getRemovedGroup(), slotsToPull, true);
+ PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor(removalResult.getRemovedGroup(),
+ slotsToPull, true);
+ pullFileSnapshot(taskDescriptor, null);
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/SerializeUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/SerializeUtils.java
index e26e0e9..274c49a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/SerializeUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/SerializeUtils.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.cluster.utils;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -100,6 +101,16 @@ public class SerializeUtils {
node.setDataPort(buffer.getInt());
}
+ public static void deserialize(Node node, DataInputStream stream) throws IOException {
+ int ipLength = stream.readInt();
+ byte[] ipBytes = new byte[ipLength];
+ stream.read(ipBytes);
+ node.setIp(new String(ipBytes));
+ node.setMetaPort(stream.readInt());
+ node.setNodeIdentifier(stream.readInt());
+ node.setDataPort(stream.readInt());
+ }
+
public static void serializeBatchData(BatchData batchData, DataOutputStream outputStream) {
try {
int length = batchData.length();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 06fd039..a85aa58 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2045,7 +2045,37 @@ public class StorageGroupProcessor {
return storageGroupName;
}
+ /**
+ * Check if the data of "tsFileResource" all exist locally by comparing the historical versions
+ * in the partition of "partitionNumber". This is available only when the IoTDB which generated
+ * "tsFileResource" has the same close file policy as the local one.
+ * If one of the version in "tsFileResource" equals to a version of a working file, false is
+ * also returned because "tsFileResource" may have unwritten data of that file.
+ * @param tsFileResource
+ * @param partitionNum
+ * @return true if the historicalVersions of "tsFileResource" is a subset of
+ * partitionDirectFileVersions, or false if it is not a subset and it does not contain any
+ * version of a working file
+ */
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
+ // consider the case: The local node crashes when it is writing TsFile no.5.
+ // when it restarts, the leader has proceeded to no.6. When the leader sends no.5 to this
+ // node, the file should be accepted as local no.5 is not closed which means there may be
+ // unreceived data in no.5
+ // So if the incoming file contains the version of an unclosed file, it should be accepted
+ for (TsFileProcessor workSequenceTsFileProcessor : getWorkSequenceTsFileProcessors()) {
+ long workingFileVersion = workSequenceTsFileProcessor.getTsFileResource().getMaxVersion();
+ if (tsFileResource.getHistoricalVersions().contains(workingFileVersion)) {
+ return false;
+ }
+ }
+ for (TsFileProcessor workUnsequenceTsFileProcessor : getWorkUnsequenceTsFileProcessor()) {
+ long workingFileVersion = workUnsequenceTsFileProcessor.getTsFileResource().getMaxVersion();
+ if (tsFileResource.getHistoricalVersions().contains(workingFileVersion)) {
+ return false;
+ }
+ }
+
return partitionDirectFileVersions.getOrDefault(partitionNum, Collections.emptySet())
.containsAll(tsFileResource.getHistoricalVersions());
}