You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/08/21 08:22:27 UTC

[iotdb] 04/05: Revert "[IOTDB-6101] Pipe: Support tsfile cascade transport (#10795) (#10796)"

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.2.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a080020d7abbbf2e888806d7cf39c510bab76641
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Aug 21 16:21:02 2023 +0800

    Revert "[IOTDB-6101] Pipe: Support tsfile cascade transport  (#10795) (#10796)"
    
    This reverts commit d8050eef07a4d947a8a52b195326b6ed4b78ce2e.
---
 .../listener/PipeInsertionDataNodeListener.java        |  9 ++++++---
 .../planner/plan/node/load/LoadSingleTsFileNode.java   | 11 +++--------
 .../iotdb/db/storageengine/dataregion/DataRegion.java  |  5 -----
 .../dataregion/memtable/TsFileProcessor.java           |  2 --
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java     |  2 --
 .../dataregion/TsFileResourceProgressIndexTest.java    | 18 ------------------
 6 files changed, 9 insertions(+), 38 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index b7a7a0c3770..8afa1831566 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.extractor.realtime.listener;
 
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
 import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
 import org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner;
@@ -92,9 +93,11 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// listen to events ////////////////////////////
 
   public void listenToTsFile(String dataRegionId, TsFileResource tsFileResource) {
-    // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on purpose
-    // because extractors may use tsfile events when some exceptions occur in the
-    // insert nodes listening process.
+    // wo don't judge whether listenToTsFileExtractorCount.get() == 0 here, because
+    // when using SimpleProgressIndex, the tsfile event needs to be assigned to the
+    // extractor even if listenToTsFileExtractorCount.get() == 0 to record the progress
+
+    PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
 
     final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 4f3506bcb75..d536a357412 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -100,13 +99,9 @@ public class LoadSingleTsFileNode extends WritePlanNode {
       needDecodeTsFile = !isDispatchedToLocal(new HashSet<>(partitionFetcher.apply(slotList)));
     }
 
-    PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource);
-
-    // we serialize the resource file even if the tsfile does not need to be decoded
-    // or the resource file is already existed because we need to serialize the
-    // progress index of the tsfile
-    resource.serialize();
-
+    if (!needDecodeTsFile && !resource.resourceFileExists()) {
+      resource.serialize();
+    }
     return needDecodeTsFile;
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 0011db21534..db5d0168d20 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
-import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
@@ -2222,9 +2221,6 @@ public class DataRegion implements IDataRegionForQuery {
       }
       loadTsFileToUnSequence(
           tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile);
-
-      PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, newTsFileResource);
-
       FileMetrics.getInstance()
           .addFile(
               newTsFileResource.getTsFile().length(),
@@ -2433,7 +2429,6 @@ public class DataRegion implements IDataRegionForQuery {
       } else {
         Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
       }
-
     } catch (IOException e) {
       logger.error(
           "File renaming failed when loading .resource file. Origin: {}, Target: {}",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index dc7086dde70..1ded166cfc7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
@@ -891,7 +890,6 @@ public class TsFileProcessor {
       IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
 
       try {
-        PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
         PipeInsertionDataNodeListener.getInstance()
             .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(), tsFileResource);
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 2f243b2b508..28bb8f82930 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
@@ -107,7 +106,6 @@ public class FileLoaderUtils {
       }
     }
     resource.setStatus(TsFileResourceStatus.NORMAL);
-    PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource);
     return resource;
   }
 
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index 62b9caa2f35..c5339c1b738 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -21,9 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
-import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
@@ -90,21 +87,6 @@ public class TsFileResourceProgressIndexTest {
 
   @Test
   public void testProgressIndexRecorder() {
-    HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(3, 4));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(6, 6));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
-        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 2)));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
-        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 3)));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
-        new RecoverProgressIndex(2, new SimpleProgressIndex(4, 3)));
-    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
-        new RecoverProgressIndex(3, new SimpleProgressIndex(5, 5)));
-    Assert.assertTrue(hybridProgressIndex.isAfter(new SimpleProgressIndex(6, 5)));
-    Assert.assertTrue(
-        hybridProgressIndex.isAfter(new RecoverProgressIndex(3, new SimpleProgressIndex(5, 4))));
-
     Assert.assertTrue(
         new MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));