You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/11 10:46:48 UTC

[iotdb] branch jira5324 created (now 8b40073a5c)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira5324
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 8b40073a5c finish

This branch includes the following new commits:

     new 8b40073a5c finish

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: finish

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5324
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8b40073a5cd0ec44bc9b6886bef87d06958f48f9
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Wed Jan 11 18:46:32 2023 +0800

    finish
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  1 +
 .../consensus/iot/IoTConsensusServerImpl.java      | 48 +++++++++++------
 .../consensus/iot/IoTConsensusServerMetrics.java   |  2 +-
 .../consensus/iot/logdispatcher/LogDispatcher.java |  2 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |  2 +-
 .../apache/iotdb/consensus/iot/ReplicateTest.java  | 32 ++++++------
 .../test/java/org/apache/iotdb/db/auth/ATest.java  | 60 ++++++++++++++++++++++
 7 files changed, 113 insertions(+), 34 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 5135d384a2..5cfe53459d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -262,6 +262,7 @@ public class IoTConsensus implements IConsensus {
 
       // step 2: notify all the other Peers to build the sync connection to newPeer
       logger.info("[IoTConsensus] notify current peers to build sync log...");
+      impl.checkAndLockSafeDeletedSearchIndex();
       impl.notifyPeersToBuildSyncLogChannel(peer);
 
       // step 3: take snapshot
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1d668c281a..fae9c043b1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -98,7 +98,7 @@ public class IoTConsensusServerImpl {
   private final Condition stateMachineCondition = stateMachineLock.newCondition();
   private final String storageDir;
   private final List<Peer> configuration;
-  private final AtomicLong index;
+  private final AtomicLong searchIndex;
   private final LogDispatcher logDispatcher;
   private final IoTConsensusConfig config;
   private final ConsensusReqReader reader;
@@ -132,11 +132,8 @@ public class IoTConsensusServerImpl {
     this.logDispatcher = new LogDispatcher(this, clientManager);
     reader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
     long currentSearchIndex = reader.getCurrentSearchIndex();
-    if (1 == configuration.size()) {
-      // only one configuration means single replica.
-      reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
-    }
-    this.index = new AtomicLong(currentSearchIndex);
+    checkAndUpdateSafeDeletedSearchIndex();
+    this.searchIndex = new AtomicLong(currentSearchIndex);
     this.consensusGroupId = thisNode.getGroupId().toString();
     this.metrics = new IoTConsensusServerMetrics(this);
   }
@@ -180,7 +177,7 @@ public class IoTConsensusServerImpl {
       if (needBlockWrite()) {
         logger.info(
             "[Throttle Down] index:{}, safeIndex:{}",
-            getIndex(),
+            getSearchIndex(),
             getCurrentSafelyDeletedSearchIndex());
         try {
           boolean timeout =
@@ -240,9 +237,9 @@ public class IoTConsensusServerImpl {
         // is not expected and will slow down the preparation speed for batch.
         // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
         // in one transaction.
-        synchronized (index) {
+        synchronized (searchIndex) {
           logDispatcher.offer(indexedConsensusRequest);
-          index.incrementAndGet();
+          searchIndex.incrementAndGet();
         }
         // statistic the time of offering request into queue
         MetricService.getInstance()
@@ -456,7 +453,7 @@ public class IoTConsensusServerImpl {
       if (peer.equals(thisNode)) {
         // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
         // snapshot produced by thisNode
-        buildSyncLogChannel(targetPeer, index.get());
+        buildSyncLogChannel(targetPeer, searchIndex.get());
       } else {
         // use RPC to tell other peers to build sync log channel to target peer
         try (SyncIoTConsensusServiceClient client =
@@ -592,6 +589,7 @@ public class IoTConsensusServerImpl {
       logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", targetPeer);
       // step 2, update configuration
       configuration.remove(targetPeer);
+      checkAndUpdateSafeDeletedSearchIndex();
       // step 3, persist configuration
       persistConfigurationUpdate();
       logger.info("[IoTConsensus] configuration updated to {}", this.configuration);
@@ -668,7 +666,7 @@ public class IoTConsensusServerImpl {
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
-    return new IndexedConsensusRequest(index.get() + 1, Collections.singletonList(request));
+    return new IndexedConsensusRequest(searchIndex.get() + 1, Collections.singletonList(request));
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
@@ -682,7 +680,7 @@ public class IoTConsensusServerImpl {
    * single copies, the current index is selected
    */
   public long getCurrentSafelyDeletedSearchIndex() {
-    return logDispatcher.getMinSyncIndex().orElseGet(index::get);
+    return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get);
   }
 
   public String getStorageDir() {
@@ -697,8 +695,8 @@ public class IoTConsensusServerImpl {
     return configuration;
   }
 
-  public long getIndex() {
-    return index.get();
+  public long getSearchIndex() {
+    return searchIndex.get();
   }
 
   public IoTConsensusConfig getConfig() {
@@ -723,7 +721,7 @@ public class IoTConsensusServerImpl {
   }
 
   public AtomicLong getIndexObject() {
-    return index;
+    return searchIndex;
   }
 
   public boolean isReadOnly() {
@@ -768,4 +766,24 @@ public class IoTConsensusServerImpl {
       }
     }
   }
+
+  /**
+   * We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data
+   * lost
+   */
+  public void checkAndLockSafeDeletedSearchIndex() {
+    if (configuration.size() == 1) {
+      reader.setSafelyDeletedSearchIndex(searchIndex.get());
+    }
+  }
+
+  /**
+   * only one configuration means single replica, then we can set safelyDeletedSearchIndex to
+   * Long.MAX_VALUE
+   */
+  public void checkAndUpdateSafeDeletedSearchIndex() {
+    if (configuration.size() == 1) {
+      reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index 88b25486b7..1e36e20b70 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -41,7 +41,7 @@ public class IoTConsensusServerMetrics implements IMetricSet {
             Metric.IOT_CONSENSUS.toString(),
             MetricLevel.IMPORTANT,
             impl,
-            IoTConsensusServerImpl::getIndex,
+            IoTConsensusServerImpl::getSearchIndex,
             Tag.NAME.toString(),
             "ioTConsensusServerImpl",
             Tag.REGION.toString(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 752a392ed1..93de711b76 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -344,7 +344,7 @@ public class LogDispatcher {
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndex;
       synchronized (impl.getIndexObject()) {
-        maxIndex = impl.getIndex() + 1;
+        maxIndex = impl.getSearchIndex() + 1;
         logger.debug(
             "{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, bufferedEntries size: {}",
             impl.getThisNode().getGroupId(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index f966528cdc..6b24f596a2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -236,7 +236,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy
       resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
       return;
     }
-    long searchIndex = impl.getIndex();
+    long searchIndex = impl.getSearchIndex();
     long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
     resultHandler.onComplete(
         new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex));
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index e1675e7994..a36db6e3ea 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -120,17 +120,17 @@ public class ReplicateTest {
     servers.get(1).createPeer(group.getGroupId(), group.getPeers());
     servers.get(2).createPeer(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
       servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getSearchIndex());
     }
 
     for (int i = 0; i < 3; i++) {
@@ -163,9 +163,9 @@ public class ReplicateTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < 3; i++) {
       long start = System.currentTimeMillis();
@@ -197,14 +197,14 @@ public class ReplicateTest {
     servers.get(0).createPeer(group.getGroupId(), group.getPeers());
     servers.get(1).createPeer(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
     }
 
     Assert.assertEquals(0, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -219,9 +219,9 @@ public class ReplicateTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < 2; i++) {
       long start = System.currentTimeMillis();
diff --git a/server/src/test/java/org/apache/iotdb/db/auth/ATest.java b/server/src/test/java/org/apache/iotdb/db/auth/ATest.java
new file mode 100644
index 0000000000..1994a53176
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/auth/ATest.java
@@ -0,0 +1,60 @@
+package org.apache.iotdb.db.auth;
+
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.base.Stopwatch;
+
+import java.time.Instant;
+import java.util.Calendar;
+import java.util.Date;
+
+public class ATest {
+  @Test
+  public void testSystemCurrentTime() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      System.currentTimeMillis();
+    }
+    stopwatch.stop();
+    System.out.println("System.currentTimeMillis(): " + stopwatch);
+  }
+
+  @Test
+  public void testDateTime() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      (new Date()).getTime();
+    }
+    stopwatch.stop();
+    System.out.println("(new Date()).getTime(): " + stopwatch);
+  }
+
+  @Test
+  public void testCalendarTime() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      Calendar.getInstance().getTimeInMillis();
+    }
+    stopwatch.stop();
+    System.out.println("Calendar.getInstance().getTimeInMillis(): " + stopwatch);
+  }
+
+  @Test
+  public void testInstantNow() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      Instant.now();
+    }
+    stopwatch.stop();
+    System.out.println("Instant.now(): " + stopwatch);
+  }
+
+  @Test
+  public void testNanoTimeNow() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      System.nanoTime();
+    }
+    stopwatch.stop();
+    System.out.println("nanoTime(): " + stopwatch);
+  }
+}