You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/03/18 13:17:07 UTC
(iotdb) 01/02: ThrowingExceptionProcessor
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch throw-exception-connector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 91529162ed81570bb6330feaa9ba969367b1fea2
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Mar 18 20:31:59 2024 +0800
ThrowingExceptionProcessor
---
.../PipeDataRegionProcessorConstructor.java | 4 +
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 3 +
.../throwing/ThrowingExceptionProcessor.java | 99 ++++++++++++++++++++++
3 files changed, 106 insertions(+)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index ba49f55ff32..8c926dd06e3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion;
import org.apache.iotdb.commons.pipe.agent.plugin.PipeProcessorConstructor;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
@@ -42,5 +43,8 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
pluginConstructors.put(
BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
SwingingDoorTrendingSamplingProcessor::new);
+ pluginConstructors.put(
+ BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(),
+ ThrowingExceptionProcessor::new);
}
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index c1ae3a75f2b..8f309e4285f 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtract
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
import java.util.Arrays;
import java.util.Collections;
@@ -54,6 +55,7 @@ public enum BuiltinPipePlugin {
TUMBLING_TIME_SAMPLING_PROCESSOR(
"tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class),
SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class),
+ THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class),
// connectors
DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
@@ -113,6 +115,7 @@ public enum BuiltinPipePlugin {
// Processors
TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+ THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(),
// Connectors
DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java
new file mode 100644
index 00000000000..69b75be47f8
--- /dev/null
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ThrowingExceptionProcessor implements PipeProcessor {
+
+ private boolean throwInCustomize = false;
+ private boolean throwInProcessTabletInsertionEvent = false;
+ private boolean throwInProcessTsFileInsertionEvent = false;
+ private boolean throwInProcessEvent = false;
+ private boolean throwInClose = false;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ final Set<String> throwingStages =
+ Arrays.stream(
+ validator.getParameters().getStringOrDefault("stages", "").toLowerCase().split(","))
+ .collect(Collectors.toSet());
+
+ final boolean throwInValidate = throwingStages.contains("validate");
+ if (throwInValidate) {
+ throw new Exception("Throwing exception in validate");
+ }
+
+ throwInCustomize = throwingStages.contains("customize");
+ throwInProcessTabletInsertionEvent = throwingStages.contains("process-tablet-insertion-event");
+ throwInProcessTsFileInsertionEvent = throwingStages.contains("process-tsfile-insertion-event");
+ throwInProcessEvent = throwingStages.contains("process-event");
+ throwInClose = throwingStages.contains("close");
+ }
+
+ @Override
+ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
+ throws Exception {
+ if (throwInCustomize) {
+ throw new Exception("Throwing exception in customize");
+ }
+ }
+
+ @Override
+ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+ throws Exception {
+ if (throwInProcessTabletInsertionEvent) {
+ throw new Exception("Throwing exception in process(TabletInsertionEvent, EventCollector)");
+ }
+ }
+
+ @Override
+ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+ throws Exception {
+ if (throwInProcessTsFileInsertionEvent) {
+ throw new Exception("Throwing exception in process(TsFileInsertionEvent, EventCollector)");
+ }
+ }
+
+ @Override
+ public void process(Event event, EventCollector eventCollector) throws Exception {
+ if (throwInProcessEvent) {
+ throw new Exception("Throwing exception in process(Event, EventCollector)");
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (throwInClose) {
+ throw new Exception("Throwing exception in close");
+ }
+ }
+}