You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/29 17:02:18 UTC
[iotdb] branch master updated: [IOTDB-5920] Pipe collector modes: TsFile only, log only, realtime only, historical only (#9942)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bd1222d180e [IOTDB-5920] Pipe collector modes: TsFile only, log only, realtime only, historical only (#9942)
bd1222d180e is described below
commit bd1222d180ed8bc539ebdd380c92e12b17bf12b9
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Tue May 30 01:02:12 2023 +0800
[IOTDB-5920] Pipe collector modes: TsFile only, log only, realtime only, historical only (#9942)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../api/customizer/PipeParameterValidator.java | 27 ++++
.../db/pipe/config/PipeCollectorConstant.java | 10 ++
.../core/collector/IoTDBDataRegionCollector.java | 147 ++++++++++++++++++---
.../PipeHistoricalDataRegionCollector.java} | 15 +--
.../PipeHistoricalDataRegionFakeCollector.java | 56 ++++++++
.../PipeHistoricalDataRegionTsFileCollector.java | 42 +++++-
.../realtime/PipeRealtimeDataRegionCollector.java | 12 +-
.../PipeRealtimeDataRegionFakeCollector.java | 70 ++++++++++
.../PipeRealtimeDataRegionHybridCollector.java | 14 +-
.../PipeRealtimeDataRegionLogCollector.java | 117 ++++++++++++++++
.../PipeRealtimeDataRegionTsFileCollector.java | 118 +++++++++++++++++
.../listener/PipeInsertionDataNodeListener.java | 31 ++++-
.../collector/CachedSchemaPatternMatcherTest.java | 13 +-
.../core/collector/PipeRealtimeCollectTest.java | 2 +
14 files changed, 628 insertions(+), 46 deletions(-)
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
index fa0632104b7..2ba9fffa612 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.pipe.api.customizer;
import org.apache.iotdb.pipe.api.exception.PipeAttributeNotProvidedException;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import java.util.Arrays;
+
public class PipeParameterValidator {
private final PipeParameters parameters;
@@ -49,6 +51,31 @@ public class PipeParameterValidator {
return this;
}
+ public PipeParameterValidator validateAttributeValueRange(
+ String key, boolean canBeOptional, String... optionalValues)
+ throws PipeAttributeNotProvidedException {
+ if (!parameters.hasAttribute(key)) {
+ if (!canBeOptional) {
+ throw new PipeAttributeNotProvidedException(String.format("%s should be set.", key));
+ }
+ return this;
+ }
+
+ final String actualValue = parameters.getString(key);
+ for (String optionalValue : optionalValues) {
+ if (actualValue.equals(optionalValue)) {
+ return this;
+ }
+ }
+
+ if (canBeOptional) {
+ return this;
+ }
+
+ throw new PipeAttributeNotProvidedException(
+ String.format("%s should be one of %s", key, Arrays.toString(optionalValues)));
+ }
+
/**
* Validates the input parameters according to the validation rule given by the user.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
index 06485b8cc2e..105d45f8484 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
@@ -28,6 +28,16 @@ public class PipeCollectorConstant {
public static final String DATA_REGION_KEY = "collector.data-region";
+ public static final String COLLECTOR_HISTORY_ENABLE_KEY = "collector.history.enable";
+ public static final String COLLECTOR_HISTORY_START_TIME = "collector.history.start-time";
+ public static final String COLLECTOR_HISTORY_END_TIME = "collector.history.end-time";
+
+ public static final String COLLECTOR_REALTIME_ENABLE = "collector.realtime.enable";
+ public static final String COLLECTOR_REALTIME_MODE = "collector.realtime.mode";
+ public static final String COLLECTOR_REALTIME_MODE_HYBRID = "hybrid";
+ public static final String COLLECTOR_REALTIME_MODE_FILE = "file";
+ public static final String COLLECTOR_REALTIME_MODE_LOG = "log";
+
private PipeCollectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index db5ece66fd7..b26c880b028 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -23,25 +23,47 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionFakeCollector;
import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionFakeCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionLogCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionTsFileCollector;
import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG;
public class IoTDBDataRegionCollector implements PipeCollector {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionCollector.class);
+
private final AtomicBoolean hasBeenStarted;
- private final PipeRealtimeDataRegionCollector realtimeCollector;
+ private final ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue;
+ private final PipeTaskMeta pipeTaskMeta;
+
// TODO: support pattern in historical collector
- private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
+ private PipeHistoricalDataRegionCollector historicalCollector;
+ private PipeRealtimeDataRegionCollector realtimeCollector;
private int dataRegionId;
@@ -49,27 +71,102 @@ public class IoTDBDataRegionCollector implements PipeCollector {
PipeTaskMeta pipeTaskMeta,
ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
hasBeenStarted = new AtomicBoolean(false);
+
+ this.pipeTaskMeta = pipeTaskMeta;
+ this.collectorPendingQueue = collectorPendingQueue;
+
+ historicalCollector = new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
realtimeCollector =
new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
- historicalCollector = new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
}
@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
- // TODO: require more attributes
- realtimeCollector.validate(validator);
+ // validate collector.history.enable and collector.realtime.enable
+ validator
+ .validateAttributeValueRange(
+ COLLECTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
+ .validateAttributeValueRange(
+ COLLECTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
+ .validate(
+ args -> (boolean) args[0] || (boolean) args[1],
+ String.format(
+ "Should not set both %s and %s to false.",
+ COLLECTOR_HISTORY_ENABLE_KEY, COLLECTOR_REALTIME_ENABLE),
+ validator.getParameters().getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true),
+ validator.getParameters().getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true));
+
+ // validate collector.realtime.mode
+ if (validator.getParameters().getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) {
+ validator.validateAttributeValueRange(
+ COLLECTOR_REALTIME_MODE,
+ true,
+ COLLECTOR_REALTIME_MODE_HYBRID,
+ COLLECTOR_REALTIME_MODE_FILE,
+ COLLECTOR_REALTIME_MODE_LOG);
+ }
+
+ constructHistoricalCollector(validator.getParameters());
+ constructRealtimeCollector(validator.getParameters());
+
historicalCollector.validate(validator);
+ realtimeCollector.validate(validator);
+ }
+
+ private void constructHistoricalCollector(PipeParameters parameters) {
+ // enable historical collector by default
+ historicalCollector =
+ parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
+ ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta)
+ : new PipeHistoricalDataRegionFakeCollector();
+ }
+
+ private void constructRealtimeCollector(PipeParameters parameters) {
+ // enable realtime collector by default
+ if (!parameters.getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) {
+ realtimeCollector = new PipeRealtimeDataRegionFakeCollector(pipeTaskMeta);
+ return;
+ }
+
+ // use hybrid mode by default
+ if (!parameters.hasAttribute(COLLECTOR_REALTIME_MODE)) {
+ realtimeCollector =
+ new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
+ return;
+ }
+
+ switch (parameters.getString(COLLECTOR_REALTIME_MODE)) {
+ case COLLECTOR_REALTIME_MODE_FILE:
+ realtimeCollector =
+ new PipeRealtimeDataRegionTsFileCollector(pipeTaskMeta, collectorPendingQueue);
+ break;
+ case COLLECTOR_REALTIME_MODE_LOG:
+ realtimeCollector =
+ new PipeRealtimeDataRegionLogCollector(pipeTaskMeta, collectorPendingQueue);
+ break;
+ case COLLECTOR_REALTIME_MODE_HYBRID:
+ realtimeCollector =
+ new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
+ break;
+ default:
+ realtimeCollector =
+ new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
+ LOGGER.warn(
+ String.format(
+ "Unsupported collector realtime mode: %s, create a hybrid collector.",
+ parameters.getString(COLLECTOR_REALTIME_MODE)));
+ }
}
@Override
- public void customize(
- PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+ public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
+ throws Exception {
dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
- realtimeCollector.customize(parameters, configuration);
historicalCollector.customize(parameters, configuration);
+ realtimeCollector.customize(parameters, configuration);
}
@Override
@@ -79,6 +176,8 @@ public class IoTDBDataRegionCollector implements PipeCollector {
}
hasBeenStarted.set(true);
+ final AtomicReference<Exception> exceptionHolder = new AtomicReference<>(null);
+ final DataRegionId dataRegionId = new DataRegionId(this.dataRegionId);
while (true) {
// try to start collectors in the data region ...
// first try to run if data region exists, then try to run if data region does not exist.
@@ -86,29 +185,47 @@ public class IoTDBDataRegionCollector implements PipeCollector {
// runIfPresent and runIfAbsent operations. in this case, we need to retry.
if (StorageEngine.getInstance()
.runIfPresent(
- new DataRegionId(dataRegionId),
+ dataRegionId,
(dataRegion -> {
dataRegion.writeLock(
String.format(
"Pipe: starting %s", IoTDBDataRegionCollector.class.getName()));
try {
- startHistoricalCollectorAndRealtimeCollector();
+ startHistoricalCollectorAndRealtimeCollector(exceptionHolder);
} finally {
dataRegion.writeUnlock();
}
}))
|| StorageEngine.getInstance()
.runIfAbsent(
- new DataRegionId(dataRegionId),
- this::startHistoricalCollectorAndRealtimeCollector)) {
+ dataRegionId,
+ () -> startHistoricalCollectorAndRealtimeCollector(exceptionHolder))) {
+ rethrowExceptionIfAny(exceptionHolder);
return;
}
+ rethrowExceptionIfAny(exceptionHolder);
}
}
- public void startHistoricalCollectorAndRealtimeCollector() {
- historicalCollector.start();
- realtimeCollector.start();
+ private void startHistoricalCollectorAndRealtimeCollector(
+ AtomicReference<Exception> exceptionHolder) {
+ try {
+ historicalCollector.start();
+ realtimeCollector.start();
+ } catch (Exception e) {
+ exceptionHolder.set(e);
+ LOGGER.warn(
+ String.format(
+ "Start historical collector %s and realtime collector %s error.",
+ historicalCollector, realtimeCollector),
+ e);
+ }
+ }
+
+ private void rethrowExceptionIfAny(AtomicReference<Exception> exceptionHolder) {
+ if (exceptionHolder.get() != null) {
+ throw new PipeManagementException("failed to start collectors.", exceptionHolder.get());
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionCollector.java
similarity index 64%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionCollector.java
index 06485b8cc2e..b5eb71c8df0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionCollector.java
@@ -17,18 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.core.collector.historical;
-public class PipeCollectorConstant {
+import org.apache.iotdb.pipe.api.PipeCollector;
- public static final String COLLECTOR_KEY = "collector";
+public abstract class PipeHistoricalDataRegionCollector implements PipeCollector {
- public static final String COLLECTOR_PATTERN_KEY = "collector.pattern";
- public static final String COLLECTOR_PATTERN_DEFAULT_VALUE = "root";
-
- public static final String DATA_REGION_KEY = "collector.data-region";
-
- private PipeCollectorConstant() {
- throw new IllegalStateException("Utility class");
- }
+ public abstract boolean hasConsumedAll();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionFakeCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionFakeCollector.java
new file mode 100644
index 00000000000..fb61a9b4cc1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionFakeCollector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.historical;
+
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class PipeHistoricalDataRegionFakeCollector extends PipeHistoricalDataRegionCollector {
+
+ @Override
+ public void validate(PipeParameterValidator validator) {}
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {}
+
+ @Override
+ public void start() {}
+
+ @Override
+ public Event supply() {
+ return null;
+ }
+
+ @Override
+ public boolean hasConsumedAll() {
+ return true;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public String toString() {
+ return "PipeHistoricalDataRegionFakeCollector{}";
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 9e796bddc77..90041ed1d59 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -25,25 +25,30 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.Event;
+import java.time.ZoneId;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.stream.Collectors;
-public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
+public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataRegionCollector {
private final PipeTaskMeta pipeTaskMeta;
private final ProgressIndex startIndex;
private int dataRegionId;
+ private long collectStartTime;
+ private long collectEndTime;
+
private Queue<PipeTsFileInsertionEvent> pendingQueue;
public PipeHistoricalDataRegionTsFileCollector(PipeTaskMeta pipeTaskMeta) {
@@ -60,6 +65,18 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
+ collectStartTime =
+ parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME)
+ ? DateTimeUtils.convertDatetimeStrToLong(
+ parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME),
+ ZoneId.systemDefault())
+ : Long.MIN_VALUE;
+ collectEndTime =
+ parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME)
+ ? DateTimeUtils.convertDatetimeStrToLong(
+ parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME),
+ ZoneId.systemDefault())
+ : Long.MAX_VALUE;
}
@Override
@@ -81,18 +98,24 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + tsFileManager.size(false));
pendingQueue.addAll(
tsFileManager.getTsFileList(true).stream()
- .filter(resource -> !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()))
+ .filter(
+ resource ->
+ !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+ && isTsFileResourceOverlappedWithTimeRange(resource))
.map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta))
.collect(Collectors.toList()));
pendingQueue.addAll(
tsFileManager.getTsFileList(false).stream()
- .filter(resource -> !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()))
+ .filter(
+ resource ->
+ !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+ && isTsFileResourceOverlappedWithTimeRange(resource))
.map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta))
.collect(Collectors.toList()));
pendingQueue.forEach(
- event -> {
- event.increaseReferenceCount(PipeHistoricalDataRegionTsFileCollector.class.getName());
- });
+ event ->
+ event.increaseReferenceCount(
+ PipeHistoricalDataRegionTsFileCollector.class.getName()));
} finally {
tsFileManager.readUnlock();
}
@@ -101,6 +124,11 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
}
}
+ private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource resource) {
+ return !(resource.getFileEndTime() < collectStartTime
+ || collectEndTime < resource.getFileStartTime());
+ }
+
@Override
public Event supply() {
if (pendingQueue == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index fd790b718cd..46d81b05ed0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -45,8 +45,8 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
}
@Override
- public void customize(
- PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+ public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
+ throws Exception {
pattern =
parameters.getStringOrDefault(
PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
@@ -55,18 +55,22 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
}
@Override
- public void start() {
+ public void start() throws Exception {
PipeInsertionDataNodeListener.getInstance().startListenAndAssign(dataRegionId, this);
}
@Override
- public void close() {
+ public void close() throws Exception {
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, this);
}
/** @param event the event from the storage engine */
public abstract void collect(PipeRealtimeCollectEvent event);
+ public abstract boolean isNeedListenToTsFile();
+
+ public abstract boolean isNeedListenToInsertNode();
+
public final String getPattern() {
return pattern;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionFakeCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
new file mode 100644
index 00000000000..a559472f290
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionCollector {
+
+ public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
+ super(pipeTaskMeta);
+ }
+
+ @Override
+ public void validate(PipeParameterValidator validator) {}
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {}
+
+ @Override
+ public void start() {}
+
+ @Override
+ public Event supply() {
+ return null;
+ }
+
+ @Override
+ public void collect(PipeRealtimeCollectEvent event) {}
+
+ @Override
+ public boolean isNeedListenToTsFile() {
+ return false;
+ }
+
+ @Override
+ public boolean isNeedListenToInsertNode() {
+ return false;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public String toString() {
+ return "PipeRealtimeDataRegionFakeCollector{}";
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 302594b1d02..abb3a109c2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -66,6 +66,16 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
}
}
+ @Override
+ public boolean isNeedListenToTsFile() {
+ return true;
+ }
+
+ @Override
+ public boolean isNeedListenToInsertNode() {
+ return true;
+ }
+
private void collectTabletInsertion(PipeRealtimeCollectEvent event) {
if (isApproachingCapacity()) {
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
@@ -120,7 +130,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
} else {
throw new UnsupportedOperationException(
String.format(
- "Unsupported event type %s for Hybrid Realtime Collector %s",
+ "Unsupported event type %s for Hybrid Realtime Collector %s to supply.",
eventToSupply.getClass(), this));
}
@@ -195,7 +205,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
}
@Override
- public void close() {
+ public void close() throws Exception {
super.close();
pendingQueue.clear();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
new file mode 100644
index 00000000000..2d5150bcd80
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCollector {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeRealtimeDataRegionLogCollector.class);
+
+ // TODO: memory control
+ // This queue is used to store pending events collected by the method collect(). The method
+ // supply() will poll events from this queue and send them to the next pipe plugin.
+ private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+
+ public PipeRealtimeDataRegionLogCollector(
+ PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+ super(pipeTaskMeta);
+ this.pendingQueue = pendingQueue;
+ }
+
+ @Override
+ public void collect(PipeRealtimeCollectEvent event) {
+ event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET);
+
+ if (!(event.getEvent() instanceof TabletInsertionEvent)) {
+ return;
+ }
+
+ if (!pendingQueue.offer(event)) {
+ LOGGER.warn(
+ String.format(
+ "Pending Queue of Log Realtime Collector %s has reached capacity, discard Tablet Event %s, current state %s",
+ this, event, event.getTsFileEpoch().getState(this)));
+ // this would not happen, but just in case.
+ // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+ // TODO: memory control when elements in queue are too many.
+ }
+ }
+
+ @Override
+ public boolean isNeedListenToTsFile() {
+ return false;
+ }
+
+ @Override
+ public boolean isNeedListenToInsertNode() {
+ return true;
+ }
+
+ @Override
+ public Event supply() {
+ PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+
+ while (collectEvent != null) {
+ Event suppliedEvent = null;
+
+ if (collectEvent.increaseReferenceCount(PipeRealtimeDataRegionLogCollector.class.getName())) {
+ suppliedEvent = collectEvent.getEvent();
+ } else {
+ // if the event's reference count can not be increased, it means the data represented by
+ // this event is not reliable anymore. the data has been lost. we simply discard this event
+ // and report the exception to PipeRuntimeAgent.
+ final String errorMessage =
+ String.format(
+ "Tablet Event %s can not be supplied because the reference count can not be increased, "
+ + "the data represented by this event is lost",
+ collectEvent.getEvent());
+ PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+ }
+
+ collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogCollector.class.getName());
+ if (suppliedEvent != null) {
+ return suppliedEvent;
+ }
+
+ collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+ }
+
+ // means the pending queue is empty.
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ pendingQueue.clear();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
new file mode 100644
index 00000000000..4b2849f0243
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegionCollector {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileCollector.class);
+
+ // TODO: memory control
+ // This queue is used to store pending events collected by the method collect(). The method
+ // supply() will poll events from this queue and send them to the next pipe plugin.
+ private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+
+ public PipeRealtimeDataRegionTsFileCollector(
+ PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+ super(pipeTaskMeta);
+ this.pendingQueue = pendingQueue;
+ }
+
+ @Override
+ public void collect(PipeRealtimeCollectEvent event) {
+ event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
+
+ if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
+ return;
+ }
+
+ if (!pendingQueue.offer(event)) {
+ LOGGER.warn(
+ String.format(
+ "Pending Queue of TsFile Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
+ this, event, event.getTsFileEpoch().getState(this)));
+ // this would not happen, but just in case.
+ // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+ // TODO: memory control when elements in queue are too many.
+ }
+ }
+
+ @Override
+ public boolean isNeedListenToTsFile() {
+ return true;
+ }
+
+ @Override
+ public boolean isNeedListenToInsertNode() {
+ return false;
+ }
+
+ @Override
+ public Event supply() {
+ PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+
+ while (collectEvent != null) {
+ Event suppliedEvent = null;
+
+ if (collectEvent.increaseReferenceCount(
+ PipeRealtimeDataRegionTsFileCollector.class.getName())) {
+ suppliedEvent = collectEvent.getEvent();
+ } else {
+ // if the event's reference count can not be increased, it means the data represented by
+ // this event is not reliable anymore. the data has been lost. we simply discard this event
+ // and report the exception to PipeRuntimeAgent.
+ final String errorMessage =
+ String.format(
+ "TsFile Event %s can not be supplied because the reference count can not be increased, "
+ + "the data represented by this event is lost",
+ collectEvent.getEvent());
+ PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+ }
+
+ collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileCollector.class.getName());
+ if (suppliedEvent != null) {
+ return suppliedEvent;
+ }
+
+ collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+ }
+
+ // means the pending queue is empty.
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ pendingQueue.clear();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index 68b2c5ca010..ae9cce9a498 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* PipeInsertionEventListener is a singleton in each data node.
@@ -46,6 +47,9 @@ public class PipeInsertionDataNodeListener {
private final ConcurrentMap<String, PipeDataRegionAssigner> dataRegionId2Assigner =
new ConcurrentHashMap<>();
+ private final AtomicInteger listenToTsFileCollectorCount = new AtomicInteger(0);
+ private final AtomicInteger listenToInsertNodeCollectorCount = new AtomicInteger(0);
+
//////////////////////////// start & stop ////////////////////////////
public synchronized void startListenAndAssign(
@@ -53,6 +57,13 @@ public class PipeInsertionDataNodeListener {
dataRegionId2Assigner
.computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner())
.startAssignTo(collector);
+
+ if (collector.isNeedListenToTsFile()) {
+ listenToTsFileCollectorCount.incrementAndGet();
+ }
+ if (collector.isNeedListenToInsertNode()) {
+ listenToInsertNodeCollectorCount.incrementAndGet();
+ }
}
public synchronized void stopListenAndAssign(
@@ -64,6 +75,13 @@ public class PipeInsertionDataNodeListener {
assigner.stopAssignTo(collector);
+ if (collector.isNeedListenToTsFile()) {
+ listenToTsFileCollectorCount.decrementAndGet();
+ }
+ if (collector.isNeedListenToInsertNode()) {
+ listenToInsertNodeCollectorCount.decrementAndGet();
+ }
+
if (assigner.notMoreCollectorNeededToBeAssigned()) {
// the removed assigner will is the same as the one referenced by the variable `assigner`
dataRegionId2Assigner.remove(dataRegionId);
@@ -74,12 +92,11 @@ public class PipeInsertionDataNodeListener {
//////////////////////////// listen to events ////////////////////////////
- // TODO: listen to the tsfile synced from the other cluster
- // TODO: check whether the method is called on the right place. what is the meaning of the
- // variable shouldClose before calling this method?
- // TODO: maximum the efficiency of the method when there is no pipe in the system, avoid
- // dataRegionId2Assigner.get(dataRegionId);
public void listenToTsFile(String dataRegionId, TsFileResource tsFileResource) {
+ // wo don't judge whether listenToTsFileCollectorCount.get() == 0 here, because
+ // when using SimpleProgressIndex, the tsfile event needs to be assigned to the
+ // collector even if listenToTsFileCollectorCount.get() == 0 to record the progress
+
PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
@@ -97,6 +114,10 @@ public class PipeInsertionDataNodeListener {
WALEntryHandler walEntryHandler,
InsertNode insertNode,
TsFileResource tsFileResource) {
+ if (listenToInsertNodeCollectorCount.get() == 0) {
+ return;
+ }
+
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
// only events from registered data region will be collected
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index 198761eee3b..6deaed9343d 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -38,7 +38,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -64,7 +63,7 @@ public class CachedSchemaPatternMatcherTest {
}
@Test
- public void testCachedMatcher() throws ExecutionException, InterruptedException {
+ public void testCachedMatcher() throws Exception {
PipeRealtimeDataRegionCollector databaseCollector =
new PipeRealtimeDataRegionFakeCollector(null);
databaseCollector.customize(
@@ -181,5 +180,15 @@ public class CachedSchemaPatternMatcherTest {
});
Assert.assertTrue(match[0]);
}
+
+ @Override
+ public boolean isNeedListenToTsFile() {
+ return true;
+ }
+
+ @Override
+ public boolean isNeedListenToInsertNode() {
+ return true;
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 02ef1150a84..27d28114007 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -232,6 +232,8 @@ public class PipeRealtimeCollectTest {
throw new RuntimeException(e);
}
});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}