You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/03/18 13:17:07 UTC

(iotdb) 01/02: ThrowingExceptionProcessor

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch throw-exception-connector
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 91529162ed81570bb6330feaa9ba969367b1fea2
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Mar 18 20:31:59 2024 +0800

    ThrowingExceptionProcessor
---
 .../PipeDataRegionProcessorConstructor.java        |  4 +
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |  3 +
 .../throwing/ThrowingExceptionProcessor.java       | 99 ++++++++++++++++++++++
 3 files changed, 106 insertions(+)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index ba49f55ff32..8c926dd06e3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion;
 import org.apache.iotdb.commons.pipe.agent.plugin.PipeProcessorConstructor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
 import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
@@ -42,5 +43,8 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
         SwingingDoorTrendingSamplingProcessor::new);
+    pluginConstructors.put(
+        BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(),
+        ThrowingExceptionProcessor::new);
   }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index c1ae3a75f2b..8f309e4285f 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtract
 import org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -54,6 +55,7 @@ public enum BuiltinPipePlugin {
   TUMBLING_TIME_SAMPLING_PROCESSOR(
       "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class),
   SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class),
+  THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class),
 
   // connectors
   DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
@@ -113,6 +115,7 @@ public enum BuiltinPipePlugin {
                   // Processors
                   TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
                   SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+                  THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(),
                   // Connectors
                   DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
                   IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java
new file mode 100644
index 00000000000..69b75be47f8
--- /dev/null
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ThrowingExceptionProcessor implements PipeProcessor {
+
+  private boolean throwInCustomize = false;
+  private boolean throwInProcessTabletInsertionEvent = false;
+  private boolean throwInProcessTsFileInsertionEvent = false;
+  private boolean throwInProcessEvent = false;
+  private boolean throwInClose = false;
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    final Set<String> throwingStages =
+        Arrays.stream(
+                validator.getParameters().getStringOrDefault("stages", "").toLowerCase().split(","))
+            .collect(Collectors.toSet());
+
+    final boolean throwInValidate = throwingStages.contains("validate");
+    if (throwInValidate) {
+      throw new Exception("Throwing exception in validate");
+    }
+
+    throwInCustomize = throwingStages.contains("customize");
+    throwInProcessTabletInsertionEvent = throwingStages.contains("process-tablet-insertion-event");
+    throwInProcessTsFileInsertionEvent = throwingStages.contains("process-tsfile-insertion-event");
+    throwInProcessEvent = throwingStages.contains("process-event");
+    throwInClose = throwingStages.contains("close");
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
+      throws Exception {
+    if (throwInCustomize) {
+      throw new Exception("Throwing exception in customize");
+    }
+  }
+
+  @Override
+  public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+      throws Exception {
+    if (throwInProcessTabletInsertionEvent) {
+      throw new Exception("Throwing exception in process(TabletInsertionEvent, EventCollector)");
+    }
+  }
+
+  @Override
+  public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+      throws Exception {
+    if (throwInProcessTsFileInsertionEvent) {
+      throw new Exception("Throwing exception in process(TsFileInsertionEvent, EventCollector)");
+    }
+  }
+
+  @Override
+  public void process(Event event, EventCollector eventCollector) throws Exception {
+    if (throwInProcessEvent) {
+      throw new Exception("Throwing exception in process(Event, EventCollector)");
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (throwInClose) {
+      throw new Exception("Throwing exception in close");
+    }
+  }
+}