You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/30 18:58:06 UTC

[iotdb] branch master updated: [IOTDB-5936] Pipe: correct the behaviour of the historical data collector in realtime only mode (#9987)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fc2746e7a58 [IOTDB-5936] Pipe: correct the behaviour of the historical data collector in realtime only mode (#9987)
fc2746e7a58 is described below

commit fc2746e7a58ea7b0713406e0e6fb311694bbf918
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Wed May 31 02:57:58 2023 +0800

    [IOTDB-5936] Pipe: correct the behaviour of the historical data collector in realtime only mode (#9987)
    
    The historical data collector now also starts in the "realtime only" mode because it needs to collect data created after the pipe is created when a restart or a master switch event occurs. In the fixed logic, the historical data collector adds logic to compare the data generation event and the pipe creation event.
    
    Now, when the historical data collector is created, it seals all tsfiles in the data region to ensure that the generation time of the tsfile matches the generation time of the pipe.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |   2 +-
 .../core/collector/IoTDBDataRegionCollector.java   |  22 +++--
 .../PipeHistoricalDataRegionTsFileCollector.java   | 103 +++++++++++++++++----
 .../org/apache/iotdb/db/pipe/task/PipeBuilder.java |  12 +--
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  46 +++------
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |   4 +-
 6 files changed, 117 insertions(+), 72 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 10e8679ac83..d0596127340 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -510,7 +510,7 @@ public class PipeTaskAgent {
       PipeTaskMeta pipeTaskMeta) {
     if (pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
       final PipeTask pipeTask =
-          new PipeTaskBuilder(consensusGroupId, pipeTaskMeta, pipeStaticMeta).build();
+          new PipeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build();
       pipeTask.create();
       pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index b26c880b028..37a9c5101a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionCollector;
-import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionFakeCollector;
 import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionFakeCollector;
@@ -58,8 +57,9 @@ public class IoTDBDataRegionCollector implements PipeCollector {
 
   private final AtomicBoolean hasBeenStarted;
 
-  private final ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue;
   private final PipeTaskMeta pipeTaskMeta;
+  private final long creationTime;
+  private final ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue;
 
   // TODO: support pattern in historical collector
   private PipeHistoricalDataRegionCollector historicalCollector;
@@ -69,15 +69,13 @@ public class IoTDBDataRegionCollector implements PipeCollector {
 
   public IoTDBDataRegionCollector(
       PipeTaskMeta pipeTaskMeta,
+      long creationTime,
       ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
-    hasBeenStarted = new AtomicBoolean(false);
+    this.hasBeenStarted = new AtomicBoolean(false);
 
     this.pipeTaskMeta = pipeTaskMeta;
+    this.creationTime = creationTime;
     this.collectorPendingQueue = collectorPendingQueue;
-
-    historicalCollector = new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
-    realtimeCollector =
-        new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
   }
 
   @Override
@@ -119,8 +117,14 @@ public class IoTDBDataRegionCollector implements PipeCollector {
     // enable historical collector by default
     historicalCollector =
         parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
-            ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta)
-            : new PipeHistoricalDataRegionFakeCollector();
+            ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta, Long.MIN_VALUE)
+            // We define the realtime data as the data generated after the creation time
+            // of the pipe from user's perspective. But we still need to use
+            // PipeHistoricalDataRegionCollector to collect the realtime data generated between the
+            // creation time of the pipe and the time when the pipe starts, because those data
+            // can not be listened by PipeRealtimeDataRegionCollector, and should be collected by
+            // PipeHistoricalDataRegionCollector from implementation perspective.
+            : new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta, creationTime);
   }
 
   private void constructRealtimeCollector(PipeParameters parameters) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 90041ed1d59..c1157d96979 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -34,49 +34,98 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY;
+
 public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataRegionCollector {
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class);
+
   private final PipeTaskMeta pipeTaskMeta;
   private final ProgressIndex startIndex;
 
   private int dataRegionId;
 
-  private long collectStartTime;
-  private long collectEndTime;
+  private final long historicalDataCollectionTimeLowerBound;
+  private long historicalDataCollectionStartTime;
+  private long historicalDataCollectionEndTime;
 
   private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
-  public PipeHistoricalDataRegionTsFileCollector(PipeTaskMeta pipeTaskMeta) {
+  public PipeHistoricalDataRegionTsFileCollector(
+      PipeTaskMeta pipeTaskMeta, long historicalDataCollectionTimeLowerBound) {
     this.pipeTaskMeta = pipeTaskMeta;
     this.startIndex = pipeTaskMeta.getProgressIndex();
+
+    this.historicalDataCollectionTimeLowerBound = historicalDataCollectionTimeLowerBound;
   }
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
+    validator.validateRequiredAttribute(DATA_REGION_KEY);
   }
 
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
-    dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
-    collectStartTime =
-        parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME)
+    dataRegionId = parameters.getInt(DATA_REGION_KEY);
+    historicalDataCollectionStartTime =
+        parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME),
-                ZoneId.systemDefault())
+                parameters.getString(COLLECTOR_HISTORY_START_TIME), ZoneId.systemDefault())
             : Long.MIN_VALUE;
-    collectEndTime =
-        parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME)
+    historicalDataCollectionEndTime =
+        parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME),
-                ZoneId.systemDefault())
+                parameters.getString(COLLECTOR_HISTORY_END_TIME), ZoneId.systemDefault())
             : Long.MAX_VALUE;
+
+    // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the realtime only mode.
+    // realtime only mode -> (historicalDataCollectionTimeLowerBound != Long.MIN_VALUE)
+    //
+    // Ensure that all data in the data region is flushed to disk before collecting data.
+    // This ensures the generation time of all newly generated TsFiles (realtime data) after the
+    // invocation of flushDataRegionAllTsFiles() is later than the creationTime of the pipe
+    // (historicalDataCollectionTimeLowerBound).
+    //
+    // Note that: the generation time of the TsFile is the time when the TsFile is created, not
+    // the time when the data is flushed to the TsFile.
+    //
+    // Then we can use the generation time of the TsFile to determine whether the data in the
+    // TsFile should be collected by comparing the generation time of the TsFile with the
+    // historicalDataCollectionTimeLowerBound when starting the pipe in realtime only mode.
+    //
+    // If we don't invoke flushDataRegionAllTsFiles() in the realtime only mode, the data generated
+    // between the creation time of the pipe the time when the pipe starts will be lost.
+    if (historicalDataCollectionTimeLowerBound != Long.MIN_VALUE) {
+      flushDataRegionAllTsFiles();
+    }
+  }
+
+  private void flushDataRegionAllTsFiles() {
+    final DataRegion dataRegion =
+        StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId));
+    if (dataRegion == null) {
+      return;
+    }
+
+    dataRegion.writeLock("Pipe: create historical TsFile collector");
+    try {
+      dataRegion.syncCloseAllWorkingTsFileProcessors();
+    } finally {
+      dataRegion.writeUnlock();
+    }
   }
 
   @Override
@@ -88,7 +137,7 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR
       return;
     }
 
-    dataRegion.writeLock("Pipe: collect historical TsFile");
+    dataRegion.writeLock("Pipe: start to collect historical TsFile");
     try {
       dataRegion.syncCloseAllWorkingTsFileProcessors();
 
@@ -101,7 +150,8 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR
                 .filter(
                     resource ->
                         !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
-                            && isTsFileResourceOverlappedWithTimeRange(resource))
+                            && isTsFileResourceOverlappedWithTimeRange(resource)
+                            && isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
                 .map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta))
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
@@ -109,7 +159,8 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR
                 .filter(
                     resource ->
                         !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
-                            && isTsFileResourceOverlappedWithTimeRange(resource))
+                            && isTsFileResourceOverlappedWithTimeRange(resource)
+                            && isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
                 .map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta))
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
@@ -125,8 +176,22 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR
   }
 
   private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource resource) {
-    return !(resource.getFileEndTime() < collectStartTime
-        || collectEndTime < resource.getFileStartTime());
+    return !(resource.getFileEndTime() < historicalDataCollectionStartTime
+        || historicalDataCollectionEndTime < resource.getFileStartTime());
+  }
+
+  private boolean isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {
+    try {
+      return historicalDataCollectionTimeLowerBound
+          <= TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
+    } catch (IOException e) {
+      LOGGER.warn(
+          String.format("failed to get the generation time of TsFile %s", resource.getTsFilePath()),
+          e);
+      // If failed to get the generation time of the TsFile, we will collect the data in the TsFile
+      // anyway.
+      return true;
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
index 481eb0e5ad7..3bd469907f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -42,10 +41,6 @@ public class PipeBuilder {
 
   public Map<TConsensusGroupId, PipeTask> build() {
     final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta();
-    final String pipeName = pipeStaticMeta.getPipeName();
-    final PipeParameters collectorParameters = pipeStaticMeta.getCollectorParameters();
-    final PipeParameters processorParameters = pipeStaticMeta.getProcessorParameters();
-    final PipeParameters connectorParameters = pipeStaticMeta.getConnectorParameters();
 
     final Map<TConsensusGroupId, PipeTask> consensusGroupIdToPipeTaskMap = new HashMap<>();
 
@@ -57,12 +52,9 @@ public class PipeBuilder {
         consensusGroupIdToPipeTaskMap.put(
             consensusGroupIdToPipeTaskMeta.getKey(),
             new PipeTaskBuilder(
-                    pipeName,
+                    pipeStaticMeta,
                     consensusGroupIdToPipeTaskMeta.getKey(),
-                    consensusGroupIdToPipeTaskMeta.getValue(),
-                    collectorParameters,
-                    processorParameters,
-                    connectorParameters)
+                    consensusGroupIdToPipeTaskMeta.getValue())
                 .build());
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 274b50e1a98..1886f489c41 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -25,41 +25,18 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 
 public class PipeTaskBuilder {
 
-  private final String pipeName;
+  private final PipeStaticMeta pipeStaticMeta;
   private final TConsensusGroupId dataRegionId;
   private final PipeTaskMeta pipeTaskMeta;
-  private final PipeParameters pipeCollectorParameters;
-  private final PipeParameters pipeProcessorParameters;
-  private final PipeParameters pipeConnectorParameters;
 
-  PipeTaskBuilder(
-      String pipeName,
-      TConsensusGroupId dataRegionId,
-      PipeTaskMeta pipeTaskMeta,
-      PipeParameters pipeCollectorParameters,
-      PipeParameters pipeProcessorParameters,
-      PipeParameters pipeConnectorParameters) {
-    this.pipeName = pipeName;
+  public PipeTaskBuilder(
+      PipeStaticMeta pipeStaticMeta, TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta) {
+    this.pipeStaticMeta = pipeStaticMeta;
     this.dataRegionId = dataRegionId;
     this.pipeTaskMeta = pipeTaskMeta;
-    this.pipeCollectorParameters = pipeCollectorParameters;
-    this.pipeProcessorParameters = pipeProcessorParameters;
-    this.pipeConnectorParameters = pipeConnectorParameters;
-  }
-
-  public PipeTaskBuilder(
-      TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta, PipeStaticMeta pipeStaticMeta) {
-    this(
-        pipeStaticMeta.getPipeName(),
-        dataRegionId,
-        pipeTaskMeta,
-        pipeStaticMeta.getCollectorParameters(),
-        pipeStaticMeta.getProcessorParameters(),
-        pipeStaticMeta.getConnectorParameters());
   }
 
   public PipeTask build() {
@@ -67,21 +44,26 @@ public class PipeTaskBuilder {
 
     // we first build the collector and connector, then build the processor.
     final PipeTaskCollectorStage collectorStage =
-        new PipeTaskCollectorStage(dataRegionId, pipeTaskMeta, pipeCollectorParameters);
+        new PipeTaskCollectorStage(
+            dataRegionId,
+            pipeTaskMeta,
+            pipeStaticMeta.getCreationTime(),
+            pipeStaticMeta.getCollectorParameters());
     final PipeTaskConnectorStage connectorStage =
-        new PipeTaskConnectorStage(pipeConnectorParameters, pipeTaskMeta);
+        new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters(), pipeTaskMeta);
 
     // the processor connects the collector and connector.
     final PipeTaskProcessorStage processorStage =
         new PipeTaskProcessorStage(
-            pipeName,
+            pipeStaticMeta.getPipeName(),
             dataRegionId,
             pipeTaskMeta,
             collectorStage.getEventSupplier(),
             collectorStage.getCollectorPendingQueue(),
-            pipeProcessorParameters,
+            pipeStaticMeta.getProcessorParameters(),
             connectorStage.getPipeConnectorPendingQueue());
 
-    return new PipeTask(pipeName, dataRegionId, collectorStage, processorStage, connectorStage);
+    return new PipeTask(
+        pipeStaticMeta.getPipeName(), dataRegionId, collectorStage, processorStage, connectorStage);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 8146a3a3f28..60488c6d253 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -58,6 +58,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
   public PipeTaskCollectorStage(
       TConsensusGroupId dataRegionId,
       PipeTaskMeta pipeTaskMeta,
+      long creationTime,
       PipeParameters collectorParameters) {
     // TODO: avoid if-else, use reflection to create collector all the time
     if (collectorParameters
@@ -77,7 +78,8 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
           .put(PipeCollectorConstant.DATA_REGION_KEY, String.valueOf(dataRegionId.getId()));
 
       collectorPendingQueue = new ListenableUnboundedBlockingPendingQueue<>();
-      this.pipeCollector = new IoTDBDataRegionCollector(pipeTaskMeta, collectorPendingQueue);
+      this.pipeCollector =
+          new IoTDBDataRegionCollector(pipeTaskMeta, creationTime, collectorPendingQueue);
     } else {
       this.collectorParameters = collectorParameters;