You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/08/30 16:25:41 UTC

[iotdb] branch rel/1.2 updated: Pipe: Fix start-time and end-time parameters not working when extracting history data (#11001) (#11002)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 4b867eb55ab Pipe: Fix start-time and end-time parameters not working when extracting history data (#11001) (#11002)
4b867eb55ab is described below

commit 4b867eb55ab6e872eb2e9673b313ac00457f0479
Author: 马子坤 <55...@users.noreply.github.com>
AuthorDate: Thu Aug 31 00:22:14 2023 +0800

    Pipe: Fix start-time and end-time parameters not working when extracting history data (#11001) (#11002)
    
    (cherry picked from commit 35736cc678211ec1f1e1f51ae6b855789302ad9d)
---
 .../db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java  | 4 ++--
 .../protocol/thrift/async/IoTDBThriftAsyncConnector.java         | 4 ++--
 .../connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java | 4 ++--
 .../main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java  | 9 +++++----
 .../db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java | 2 +-
 .../db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java    | 3 +++
 6 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 854205588f6..0637cda4a4f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -186,7 +186,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
       return;
     }
 
-    if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) {
+    if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
         transfer(
             ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
@@ -226,7 +226,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
       return;
     }
 
-    if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) {
+    if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
       for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
         transfer(event);
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index d983da389bd..cb2c68ab680 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -150,7 +150,7 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector {
       return;
     }
 
-    if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) {
+    if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
         transfer(
             ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
@@ -291,7 +291,7 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector {
       return;
     }
 
-    if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) {
+    if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
       for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
         transfer(event);
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index f75bc7c612a..dfecd2170b3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -185,7 +185,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
       return;
     }
 
-    if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) {
+    if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
         transfer(
             ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
@@ -231,7 +231,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
       return;
     }
 
-    if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) {
+    if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
       for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
         transfer(event);
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 5c2651e5847..269d21f6a5d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -39,13 +39,14 @@ public abstract class EnrichedEvent implements Event {
   protected final PipeTaskMeta pipeTaskMeta;
 
   private final String pattern;
-  private final boolean isPatternParsed;
+  protected boolean isPatternAndTimeParsed;
 
   protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
     referenceCount = new AtomicInteger(0);
     this.pipeTaskMeta = pipeTaskMeta;
     this.pattern = pattern;
-    isPatternParsed = getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    isPatternAndTimeParsed =
+        getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
   }
 
   /**
@@ -130,8 +131,8 @@ public abstract class EnrichedEvent implements Event {
     return pattern == null ? PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
   }
 
-  public boolean shouldParsePattern() {
-    return !isPatternParsed;
+  public boolean shouldParsePatternOrTime() {
+    return !isPatternAndTimeParsed;
   }
 
   public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 5f30d3f934e..bd36d7eba56 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -140,7 +140,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
   }
 
   public Tablet convertToTablet() {
-    if (!shouldParsePattern()) {
+    if (!shouldParsePatternOrTime()) {
       return tablet;
     }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d1d1adf1d4e..4c7d7f6fb0e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -69,6 +69,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
 
     this.startTime = startTime;
     this.endTime = endTime;
+    if (hasTimeFilter()) {
+      this.isPatternAndTimeParsed = false;
+    }
 
     this.resource = resource;
     tsFile = resource.getTsFile();