You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/08/21 08:22:23 UTC

[iotdb] branch rc/1.2.1 created (now fbf2a3e4d23)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch rc/1.2.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at fbf2a3e4d23 Revert "[To rel/1.2] Stop generating FilterExpression if we don't specify the time filter in last query session api"

This branch includes the following new commits:

     new b9e793ed85e Revert "[To rel/1.2][IOTDB-6116] Disassociate the IoTConsensus retry logic from the forkjoinPool (#10872) (#10878)"
     new 93853c54db1 Revert "Fix SchemaFileSketchTool is not found"
     new eb5005415e4 Revert "[To rel/1.2] [IOTDB-6112] Fix Limit & Offset push down doesn't take effect while there exist time filter"
     new a080020d7ab Revert "[IOTDB-6101] Pipe: Support tsfile cascade transport  (#10795) (#10796)"
     new fbf2a3e4d23 Revert "[To rel/1.2] Stop generating FilterExpression if we don't specify the time filter in last query session api"

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 03/05: Revert "[To rel/1.2] [IOTDB-6112] Fix Limit & Offset push down doesn't take effect while there exist time filter"

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.2.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eb5005415e4d9096e854138ac5b7b3f7c7168a31
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Aug 21 16:19:50 2023 +0800

    Revert "[To rel/1.2] [IOTDB-6112] Fix Limit & Offset push down doesn't take effect while there exist time filter"
    
    This reverts commit 72e09fba536609d421db13104ba740d4729e614a.
---
 .../operator/source/AlignedSeriesScanUtil.java         | 18 ++++++++++--------
 .../read/reader/chunk/MemAlignedPageReader.java        | 14 ++++++--------
 .../tsfile/read/reader/page/AlignedPageReader.java     | 16 +++++++---------
 3 files changed, 23 insertions(+), 25 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
index d36de2e4bf7..360211805bb 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
@@ -141,11 +141,12 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
         && !isFileOverlapped()
         && !firstTimeSeriesMetadata.isModified()) {
       Filter queryFilter = scanOptions.getQueryFilter();
-      Statistics statistics = firstTimeSeriesMetadata.getStatistics();
-      if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+      if (queryFilter != null) {
+        if (!queryFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) {
+          skipCurrentFile();
+        }
+      } else {
         skipOffsetByTimeSeriesMetadata();
-      } else if (!queryFilter.satisfy(statistics)) {
-        skipCurrentFile();
       }
     }
   }
@@ -177,11 +178,12 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
   protected void filterFirstChunkMetadata() throws IOException {
     if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) {
       Filter queryFilter = scanOptions.getQueryFilter();
-      Statistics statistics = firstChunkMetadata.getStatistics();
-      if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+      if (queryFilter != null) {
+        if (!queryFilter.satisfy(firstChunkMetadata.getStatistics())) {
+          skipCurrentChunk();
+        }
+      } else {
         skipOffsetByChunkMetadata();
-      } else if (!queryFilter.satisfy(statistics)) {
-        skipCurrentChunk();
       }
     }
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedPageReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedPageReader.java
index b1bba47f129..6bce32c0a08 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedPageReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedPageReader.java
@@ -99,16 +99,17 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
   }
 
   private boolean pageSatisfy() {
-    Statistics<? extends Serializable> statistics = getStatistics();
-    if (valueFilter == null || valueFilter.allSatisfy(statistics)) {
+    if (valueFilter != null) {
+      return valueFilter.satisfy(getStatistics());
+    } else {
       // For aligned series, When we only read some measurements under an aligned device, if the
       // values of these queried measurements at a timestamp are all null, the timestamp will not be
       // selected.
       // NOTE: if we change the read semantic in the future for aligned series, we need to remove
       // this check here.
       long rowCount = getTimeStatistics().getCount();
-      for (Statistics<? extends Serializable> vStatistics : getValueStatisticsList()) {
-        if (vStatistics == null || vStatistics.hasNullValue(rowCount)) {
+      for (Statistics<? extends Serializable> statistics : getValueStatisticsList()) {
+        if (statistics == null || statistics.hasNullValue(rowCount)) {
           return true;
         }
       }
@@ -117,12 +118,9 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
       if (paginationController.hasCurOffset(rowCount)) {
         paginationController.consumeOffset(rowCount);
         return false;
-      } else {
-        return true;
       }
-    } else {
-      return valueFilter.satisfy(statistics);
     }
+    return true;
   }
 
   @Override
diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 0a93e92f423..8064db9748f 100644
--- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -118,16 +118,18 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
   }
 
   private boolean pageSatisfy() {
-    Statistics statistics = getStatistics();
-    if (filter == null || filter.allSatisfy(statistics)) {
+    if (filter != null) {
+      // TODO accept valueStatisticsList to filter
+      return filter.satisfy(getStatistics());
+    } else {
       // For aligned series, When we only query some measurements under an aligned device, if the
       // values of these queried measurements at a timestamp are all null, the timestamp will not be
       // selected.
       // NOTE: if we change the query semantic in the future for aligned series, we need to remove
       // this check here.
       long rowCount = getTimeStatistics().getCount();
-      for (Statistics vStatistics : getValueStatisticsList()) {
-        if (vStatistics == null || vStatistics.hasNullValue(rowCount)) {
+      for (Statistics statistics : getValueStatisticsList()) {
+        if (statistics == null || statistics.hasNullValue(rowCount)) {
           return true;
         }
       }
@@ -136,13 +138,9 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
       if (paginationController.hasCurOffset(rowCount)) {
         paginationController.consumeOffset(rowCount);
         return false;
-      } else {
-        return true;
       }
-    } else {
-      // TODO accept valueStatisticsList to filter
-      return filter.satisfy(statistics);
     }
+    return true;
   }
 
   @Override


[iotdb] 02/05: Revert "Fix SchemaFileSketchTool is not found"

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.2.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 93853c54db1e4129adf23801e392d9a95887877b
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Aug 21 16:19:25 2023 +0800

    Revert "Fix SchemaFileSketchTool is not found"
    
    This reverts commit 8c5761f7d86c82a6b945983bbc6cb7ce2d5b64d3.
---
 .../datanode/src/assembly/resources/tools/schema/print-pb-tree-file.sh  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/iotdb-core/datanode/src/assembly/resources/tools/schema/print-pb-tree-file.sh b/iotdb-core/datanode/src/assembly/resources/tools/schema/print-pb-tree-file.sh
index c5d895b1747..1f7df6a1070 100644
--- a/iotdb-core/datanode/src/assembly/resources/tools/schema/print-pb-tree-file.sh
+++ b/iotdb-core/datanode/src/assembly/resources/tools/schema/print-pb-tree-file.sh
@@ -45,7 +45,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
   CLASSPATH=${CLASSPATH}":"$f
 done
 
-MAIN_CLASS=org.apache.iotdb.db.tools.schema.PBTreeFileSketchTool
+MAIN_CLASS=org.apache.iotdb.db.tools.schema.SchemaFileSketchTool
 
 "$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
 exit $?
\ No newline at end of file


[iotdb] 05/05: Revert "[To rel/1.2] Stop generating FilterExpression if we don't specify the time filter in last query session api"

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.2.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fbf2a3e4d2382c5336289062f41d6ce1c1d3adf4
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Aug 21 16:21:26 2023 +0800

    Revert "[To rel/1.2] Stop generating FilterExpression if we don't specify the time filter in last query session api"
    
    This reverts commit d4f745365d086a88f32796398428a87d1d2073a1.
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java  |  2 +-
 .../queryengine/plan/parser/StatementGenerator.java | 21 +++++++++------------
 2 files changed, 10 insertions(+), 13 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 25e7d3db2c6..64f147c4977 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -952,7 +952,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       paths.add(req.deviceId + "." + sensor);
     }
     TSLastDataQueryReq tsLastDataQueryReq =
-        new TSLastDataQueryReq(req.sessionId, paths, Long.MIN_VALUE, req.statementId);
+        new TSLastDataQueryReq(req.sessionId, paths, 0, req.statementId);
     tsLastDataQueryReq.setFetchSize(req.fetchSize);
     tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery);
     tsLastDataQueryReq.setLegalPathNodes(req.legalPathNodes);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index 74cb4fb8aa2..8a2e98d3be2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -197,21 +197,18 @@ public class StatementGenerator {
         new ResultColumn(
             new TimeSeriesOperand(new PartialPath("", false)), ResultColumn.ColumnType.RAW));
 
-    QueryStatement lastQueryStatement = new QueryStatement();
-
-    if (lastDataQueryReq.getTime() != Long.MIN_VALUE) {
-      // set query filter
-      WhereCondition whereCondition = new WhereCondition();
-      GreaterEqualExpression predicate =
-          new GreaterEqualExpression(
-              new TimestampOperand(),
-              new ConstantOperand(TSDataType.INT64, Long.toString(lastDataQueryReq.getTime())));
-      whereCondition.setPredicate(predicate);
-      lastQueryStatement.setWhereCondition(whereCondition);
-    }
+    // set query filter
+    WhereCondition whereCondition = new WhereCondition();
+    GreaterEqualExpression predicate =
+        new GreaterEqualExpression(
+            new TimestampOperand(),
+            new ConstantOperand(TSDataType.INT64, Long.toString(lastDataQueryReq.getTime())));
+    whereCondition.setPredicate(predicate);
 
+    QueryStatement lastQueryStatement = new QueryStatement();
     lastQueryStatement.setSelectComponent(selectComponent);
     lastQueryStatement.setFromComponent(fromComponent);
+    lastQueryStatement.setWhereCondition(whereCondition);
     PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime);
 
     return lastQueryStatement;


[iotdb] 01/05: Revert "[To rel/1.2][IOTDB-6116] Disassociate the IoTConsensus retry logic from the forkjoinPool (#10872) (#10878)"

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.2.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b9e793ed85ed58ca577cf23a1f7ee76ba1a6bc0d
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Aug 21 16:18:46 2023 +0800

    Revert "[To rel/1.2][IOTDB-6116] Disassociate the IoTConsensus retry logic from the forkjoinPool (#10872) (#10878)"
    
    This reverts commit 1d13886a01a3cf84cd9d034446b9dff7131488ac.
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   | 17 -------
 .../consensus/iot/IoTConsensusServerImpl.java      |  8 ----
 .../consensus/iot/client/DispatchLogHandler.java   | 52 +++++++++++-----------
 .../consensus/iot/logdispatcher/LogDispatcher.java |  4 --
 .../iotdb/consensus/ratis/RatisConsensus.java      |  7 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/commons/concurrent/ThreadName.java       |  6 +--
 7 files changed, 37 insertions(+), 59 deletions(-)

diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 3afc4590c7b..047d557ad55 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -22,8 +22,6 @@ package org.apache.iotdb.consensus.iot;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.RegisterManager;
@@ -66,8 +64,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class IoTConsensus implements IConsensus {
@@ -85,7 +81,6 @@ public class IoTConsensus implements IConsensus {
   private final IoTConsensusConfig config;
   private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
-  private final ScheduledExecutorService retryService;
 
   public IoTConsensus(ConsensusConfig config, Registry registry) {
     this.thisNode = config.getThisNodeEndPoint();
@@ -102,9 +97,6 @@ public class IoTConsensus implements IConsensus {
         new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
             .createClientManager(
                 new SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
-    this.retryService =
-        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-            ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName());
     // init IoTConsensus memory manager
     IoTConsensusMemoryManager.getInstance()
         .init(
@@ -141,7 +133,6 @@ public class IoTConsensus implements IConsensus {
                   new Peer(consensusGroupId, thisNodeId, thisNode),
                   new ArrayList<>(),
                   registry.apply(consensusGroupId),
-                  retryService,
                   clientManager,
                   syncClientManager,
                   config);
@@ -158,13 +149,6 @@ public class IoTConsensus implements IConsensus {
     clientManager.close();
     syncClientManager.close();
     registerManager.deregisterAll();
-    retryService.shutdown();
-    try {
-      retryService.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e);
-      Thread.currentThread().interrupt();
-    }
   }
 
   @Override
@@ -230,7 +214,6 @@ public class IoTConsensus implements IConsensus {
                   new Peer(groupId, thisNodeId, thisNode),
                   peers,
                   registry.apply(groupId),
-                  retryService,
                   clientManager,
                   syncClientManager,
                   config);
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 92f3584705f..0b76a690dba 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -80,7 +80,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -113,14 +112,12 @@ public class IoTConsensusServerImpl {
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
   private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
   private final String consensusGroupId;
-  private final ScheduledExecutorService retryService;
 
   public IoTConsensusServerImpl(
       String storageDir,
       Peer thisNode,
       List<Peer> configuration,
       IStateMachine stateMachine,
-      ScheduledExecutorService retryService,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
       IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager,
       IoTConsensusConfig config) {
@@ -136,7 +133,6 @@ public class IoTConsensusServerImpl {
     } else {
       persistConfiguration();
     }
-    this.retryService = retryService;
     this.config = config;
     this.consensusGroupId = thisNode.getGroupId().toString();
     consensusReqReader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
@@ -736,10 +732,6 @@ public class IoTConsensusServerImpl {
     return searchIndex;
   }
 
-  public ScheduledExecutorService getRetryService() {
-    return retryService;
-  }
-
   public boolean isReadOnly() {
     return stateMachine.isReadOnly();
   }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index f69ea0c2d76..94ba349c6d6 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -29,7 +29,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 
 public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRes> {
 
@@ -88,29 +88,31 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRe
   }
 
   private void sleepCorrespondingTimeAndRetryAsynchronous() {
-    long sleepTime =
-        Math.min(
-            (long)
-                (thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
-                    * Math.pow(2, retryCount)),
-            thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
-    thread
-        .getImpl()
-        .getRetryService()
-        .schedule(
-            () -> {
-              if (thread.isStopped()) {
-                logger.debug(
-                    "LogDispatcherThread {} has been stopped, "
-                        + "we will not retrying this Batch {} after {} times",
-                    thread.getPeer(),
-                    batch,
-                    retryCount);
-              } else {
-                thread.sendBatchAsync(batch, this);
-              }
-            },
-            sleepTime,
-            TimeUnit.MILLISECONDS);
+    // TODO handle forever retry
+    CompletableFuture.runAsync(
+        () -> {
+          try {
+            long defaultSleepTime =
+                (long)
+                    (thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
+                        * Math.pow(2, retryCount));
+            Thread.sleep(
+                Math.min(
+                    defaultSleepTime, thread.getConfig().getReplication().getMaxRetryWaitTimeMs()));
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("Unexpected interruption during retry pending batch");
+          }
+          if (thread.isStopped()) {
+            logger.debug(
+                "LogDispatcherThread {} has been stopped, "
+                    + "we will not retrying this Batch {} after {} times",
+                thread.getPeer(),
+                batch,
+                retryCount);
+          } else {
+            thread.sendBatchAsync(batch, this);
+          }
+        });
   }
 }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 30a2cc2d22c..ef88d5a8163 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -299,10 +299,6 @@ public class LogDispatcher {
       return stopped;
     }
 
-    public IoTConsensusServerImpl getImpl() {
-      return impl;
-    }
-
     @Override
     public void run() {
       logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d73964c7002..055db6c0bb1 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -93,6 +93,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -125,6 +126,7 @@ class RatisConsensus implements IConsensus {
   /** TODO make it configurable */
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
 
+  private final ExecutorService addExecutor;
   private final ScheduledExecutorService diskGuardian;
   private final long triggerSnapshotThreshold;
 
@@ -154,6 +156,7 @@ class RatisConsensus implements IConsensus {
     this.ratisMetricSet = new RatisMetricSet();
 
     this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize();
+    addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.RATIS_ADD.getName());
     diskGuardian =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
             ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
@@ -186,8 +189,10 @@ class RatisConsensus implements IConsensus {
 
   @Override
   public void stop() throws IOException {
+    addExecutor.shutdown();
     diskGuardian.shutdown();
     try {
+      addExecutor.awaitTermination(5, TimeUnit.SECONDS);
       diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e);
@@ -195,8 +200,8 @@ class RatisConsensus implements IConsensus {
     } finally {
       clientManager.close();
       server.close();
-      MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
     }
+    MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
   }
 
   private boolean shouldRetry(RaftClientReply reply) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c85043f011c..f05c19bfa48 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1071,7 +1071,7 @@ public class IoTDBConfig {
   // IoTConsensus Config
   private int maxLogEntriesNumPerBatch = 1024;
   private int maxSizePerBatch = 16 * 1024 * 1024;
-  private int maxPendingBatchesNum = 5;
+  private int maxPendingBatchesNum = 12;
   private double maxMemoryRatioForQueue = 0.6;
 
   /** Pipe related */
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 3d26b9e8ee7..38968b66659 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -103,13 +103,13 @@ public enum ThreadName {
   IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
   ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"),
   LOG_DISPATCHER("LogDispatcher"),
-  LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"),
   // -------------------------- Ratis --------------------------
   // NOTICE: The thread name of ratis cannot be edited here!
   // We list the thread name here just for distinguishing what module the thread belongs to.
   RAFT_SERVER_PROXY_EXECUTOR("\\d+-impl-thread"),
   RAFT_SERVER_EXECUTOR("\\d+-server-thread"),
   RAFT_SERVER_CLIENT_EXECUTOR("\\d+-client-thread"),
+  RATIS_ADD("Ratis-Add"),
   SEGMENT_RAFT_WORKER("SegmentedRaftLogWorker"),
   STATE_MACHINE_UPDATER("StateMachineUpdater"),
   FOLLOWER_STATE("FollowerState"),
@@ -235,8 +235,7 @@ public enum ThreadName {
               IOT_CONSENSUS_RPC_SERVICE,
               IOT_CONSENSUS_RPC_PROCESSOR,
               ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL,
-              LOG_DISPATCHER,
-              LOG_DISPATCHER_RETRY_EXECUTOR));
+              LOG_DISPATCHER));
 
   private static final Set<ThreadName> ratisThreadNames =
       new HashSet<>(
@@ -244,6 +243,7 @@ public enum ThreadName {
               RAFT_SERVER_PROXY_EXECUTOR,
               RAFT_SERVER_EXECUTOR,
               RAFT_SERVER_CLIENT_EXECUTOR,
+              RATIS_ADD,
               SEGMENT_RAFT_WORKER,
               STATE_MACHINE_UPDATER,
               FOLLOWER_STATE,


[iotdb] 04/05: Revert "[IOTDB-6101] Pipe: Support tsfile cascade transport (#10795) (#10796)"

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.2.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a080020d7abbbf2e888806d7cf39c510bab76641
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Aug 21 16:21:02 2023 +0800

    Revert "[IOTDB-6101] Pipe: Support tsfile cascade transport  (#10795) (#10796)"
    
    This reverts commit d8050eef07a4d947a8a52b195326b6ed4b78ce2e.
---
 .../listener/PipeInsertionDataNodeListener.java        |  9 ++++++---
 .../planner/plan/node/load/LoadSingleTsFileNode.java   | 11 +++--------
 .../iotdb/db/storageengine/dataregion/DataRegion.java  |  5 -----
 .../dataregion/memtable/TsFileProcessor.java           |  2 --
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java     |  2 --
 .../dataregion/TsFileResourceProgressIndexTest.java    | 18 ------------------
 6 files changed, 9 insertions(+), 38 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index b7a7a0c3770..8afa1831566 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.extractor.realtime.listener;
 
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
 import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
 import org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner;
@@ -92,9 +93,11 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// listen to events ////////////////////////////
 
   public void listenToTsFile(String dataRegionId, TsFileResource tsFileResource) {
-    // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on purpose
-    // because extractors may use tsfile events when some exceptions occur in the
-    // insert nodes listening process.
+    // wo don't judge whether listenToTsFileExtractorCount.get() == 0 here, because
+    // when using SimpleProgressIndex, the tsfile event needs to be assigned to the
+    // extractor even if listenToTsFileExtractorCount.get() == 0 to record the progress
+
+    PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
 
     final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 4f3506bcb75..d536a357412 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -100,13 +99,9 @@ public class LoadSingleTsFileNode extends WritePlanNode {
       needDecodeTsFile = !isDispatchedToLocal(new HashSet<>(partitionFetcher.apply(slotList)));
     }
 
-    PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource);
-
-    // we serialize the resource file even if the tsfile does not need to be decoded
-    // or the resource file is already existed because we need to serialize the
-    // progress index of the tsfile
-    resource.serialize();
-
+    if (!needDecodeTsFile && !resource.resourceFileExists()) {
+      resource.serialize();
+    }
     return needDecodeTsFile;
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 0011db21534..db5d0168d20 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
-import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
@@ -2222,9 +2221,6 @@ public class DataRegion implements IDataRegionForQuery {
       }
       loadTsFileToUnSequence(
           tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile);
-
-      PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, newTsFileResource);
-
       FileMetrics.getInstance()
           .addFile(
               newTsFileResource.getTsFile().length(),
@@ -2433,7 +2429,6 @@ public class DataRegion implements IDataRegionForQuery {
       } else {
         Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
       }
-
     } catch (IOException e) {
       logger.error(
           "File renaming failed when loading .resource file. Origin: {}, Target: {}",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index dc7086dde70..1ded166cfc7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
@@ -891,7 +890,6 @@ public class TsFileProcessor {
       IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
 
       try {
-        PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
         PipeInsertionDataNodeListener.getInstance()
             .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(), tsFileResource);
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 2f243b2b508..28bb8f82930 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
@@ -107,7 +106,6 @@ public class FileLoaderUtils {
       }
     }
     resource.setStatus(TsFileResourceStatus.NORMAL);
-    PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource);
     return resource;
   }
 
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index 62b9caa2f35..c5339c1b738 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -21,9 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
-import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
@@ -90,21 +87,6 @@ public class TsFileResourceProgressIndexTest {
 
   @Test
   public void testProgressIndexRecorder() {
-    HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(3, 4));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(6, 6));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
-        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 2)));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
-        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 3)));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
-        new RecoverProgressIndex(2, new SimpleProgressIndex(4, 3)));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
-        new RecoverProgressIndex(3, new SimpleProgressIndex(5, 5)));
-    Assert.assertTrue(hybridProgressIndex.isAfter(new SimpleProgressIndex(6, 5)));
-    Assert.assertTrue(
-        hybridProgressIndex.isAfter(new RecoverProgressIndex(3, new SimpleProgressIndex(5, 4))));
-
     Assert.assertTrue(
         new MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));