You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/12 08:44:17 UTC
[iotdb] 01/01: [IOTDB-5400] IoTConsensus loses data when replica number change from 1 to several (#8837)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch jira5400_cp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4c6bfa96cddfd20eaef4491ddb74c3c03a37d317
Author: Potato <ta...@apache.org>
AuthorDate: Thu Jan 12 16:31:50 2023 +0800
[IOTDB-5400] IoTConsensus loses data when replica number change from 1 to several (#8837)
---
.../apache/iotdb/consensus/iot/IoTConsensus.java | 1 +
.../consensus/iot/IoTConsensusServerImpl.java | 32 ++++++++++++++--------
.../consensus/iot/IoTConsensusServerMetrics.java | 2 +-
.../consensus/iot/logdispatcher/LogDispatcher.java | 2 +-
.../service/IoTConsensusRPCServiceProcessor.java | 2 +-
.../apache/iotdb/consensus/iot/ReplicateTest.java | 32 +++++++++++-----------
6 files changed, 41 insertions(+), 30 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 5135d384a2..5cfe53459d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -262,6 +262,7 @@ public class IoTConsensus implements IConsensus {
// step 2: notify all the other Peers to build the sync connection to newPeer
logger.info("[IoTConsensus] notify current peers to build sync log...");
+ impl.checkAndLockSafeDeletedSearchIndex();
impl.notifyPeersToBuildSyncLogChannel(peer);
// step 3: take snapshot
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1d668c281a..298a8caa12 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -98,7 +98,7 @@ public class IoTConsensusServerImpl {
private final Condition stateMachineCondition = stateMachineLock.newCondition();
private final String storageDir;
private final List<Peer> configuration;
- private final AtomicLong index;
+ private final AtomicLong searchIndex;
private final LogDispatcher logDispatcher;
private final IoTConsensusConfig config;
private final ConsensusReqReader reader;
@@ -136,7 +136,7 @@ public class IoTConsensusServerImpl {
// only one configuration means single replica.
reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
}
- this.index = new AtomicLong(currentSearchIndex);
+ this.searchIndex = new AtomicLong(currentSearchIndex);
this.consensusGroupId = thisNode.getGroupId().toString();
this.metrics = new IoTConsensusServerMetrics(this);
}
@@ -180,7 +180,7 @@ public class IoTConsensusServerImpl {
if (needBlockWrite()) {
logger.info(
"[Throttle Down] index:{}, safeIndex:{}",
- getIndex(),
+ getSearchIndex(),
getCurrentSafelyDeletedSearchIndex());
try {
boolean timeout =
@@ -240,9 +240,9 @@ public class IoTConsensusServerImpl {
// is not expected and will slow down the preparation speed for batch.
// So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
// in one transaction.
- synchronized (index) {
+ synchronized (searchIndex) {
logDispatcher.offer(indexedConsensusRequest);
- index.incrementAndGet();
+ searchIndex.incrementAndGet();
}
// statistic the time of offering request into queue
MetricService.getInstance()
@@ -456,7 +456,7 @@ public class IoTConsensusServerImpl {
if (peer.equals(thisNode)) {
// use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
// snapshot produced by thisNode
- buildSyncLogChannel(targetPeer, index.get());
+ buildSyncLogChannel(targetPeer, searchIndex.get());
} else {
// use RPC to tell other peers to build sync log channel to target peer
try (SyncIoTConsensusServiceClient client =
@@ -668,7 +668,7 @@ public class IoTConsensusServerImpl {
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
- return new IndexedConsensusRequest(index.get() + 1, Collections.singletonList(request));
+ return new IndexedConsensusRequest(searchIndex.get() + 1, Collections.singletonList(request));
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
@@ -682,7 +682,7 @@ public class IoTConsensusServerImpl {
* single copies, the current index is selected
*/
public long getCurrentSafelyDeletedSearchIndex() {
- return logDispatcher.getMinSyncIndex().orElseGet(index::get);
+ return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get);
}
public String getStorageDir() {
@@ -697,8 +697,8 @@ public class IoTConsensusServerImpl {
return configuration;
}
- public long getIndex() {
- return index.get();
+ public long getSearchIndex() {
+ return searchIndex.get();
}
public IoTConsensusConfig getConfig() {
@@ -723,7 +723,7 @@ public class IoTConsensusServerImpl {
}
public AtomicLong getIndexObject() {
- return index;
+ return searchIndex;
}
public boolean isReadOnly() {
@@ -768,4 +768,14 @@ public class IoTConsensusServerImpl {
}
}
}
+
+ /**
+ * We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data
+ * lost.
+ */
+ public void checkAndLockSafeDeletedSearchIndex() {
+ if (configuration.size() == 1) {
+ reader.setSafelyDeletedSearchIndex(searchIndex.get());
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index 88b25486b7..1e36e20b70 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -41,7 +41,7 @@ public class IoTConsensusServerMetrics implements IMetricSet {
Metric.IOT_CONSENSUS.toString(),
MetricLevel.IMPORTANT,
impl,
- IoTConsensusServerImpl::getIndex,
+ IoTConsensusServerImpl::getSearchIndex,
Tag.NAME.toString(),
"ioTConsensusServerImpl",
Tag.REGION.toString(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 752a392ed1..93de711b76 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -344,7 +344,7 @@ public class LogDispatcher {
long startIndex = syncStatus.getNextSendingIndex();
long maxIndex;
synchronized (impl.getIndexObject()) {
- maxIndex = impl.getIndex() + 1;
+ maxIndex = impl.getSearchIndex() + 1;
logger.debug(
"{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, bufferedEntries size: {}",
impl.getThisNode().getGroupId(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index f966528cdc..6b24f596a2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -236,7 +236,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy
resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
return;
}
- long searchIndex = impl.getIndex();
+ long searchIndex = impl.getSearchIndex();
long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
resultHandler.onComplete(
new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex));
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index e1675e7994..a36db6e3ea 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -120,17 +120,17 @@ public class ReplicateTest {
servers.get(1).createPeer(group.getGroupId(), group.getPeers());
servers.get(2).createPeer(group.getGroupId(), group.getPeers());
- Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
- Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
- Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+ Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
for (int i = 0; i < CHECK_POINT_GAP; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
- Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
- Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
- Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getIndex());
+ Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getSearchIndex());
}
for (int i = 0; i < 3; i++) {
@@ -163,9 +163,9 @@ public class ReplicateTest {
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
- Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex());
- Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex());
- Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getIndex());
+ Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getSearchIndex());
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
@@ -197,14 +197,14 @@ public class ReplicateTest {
servers.get(0).createPeer(group.getGroupId(), group.getPeers());
servers.get(1).createPeer(group.getGroupId(), group.getPeers());
- Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
- Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
+ Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
for (int i = 0; i < CHECK_POINT_GAP; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
- Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
- Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
+ Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
}
Assert.assertEquals(0, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -219,9 +219,9 @@ public class ReplicateTest {
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
- Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex());
- Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex());
- Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+ Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
+ Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
for (int i = 0; i < 2; i++) {
long start = System.currentTimeMillis();