You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/22 14:50:23 UTC

[iotdb] 01/06: remove deletion event and introduce generic event to pipe engine

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c76809d64763f5dc91d41aedd21bb88279c210b9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 22 10:44:29 2023 +0800

    remove deletion event and introduce generic event to pipe engine
---
 .../builtin/connector/DoNothingConnector.java      |  4 +-
 .../builtin/connector/IoTDBThriftConnector.java    |  4 +-
 .../builtin/processor/DoNothingProcessor.java      | 11 +++--
 .../org/apache/iotdb/pipe/api/PipeConnector.java   | 10 ++---
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   | 10 ++---
 .../iotdb/pipe/api/collector/EventCollector.java   | 39 ++++--------------
 .../pipe/api/event/dml/deletion/DeletionEvent.java | 48 ----------------------
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |  6 +--
 .../event/view/collector/PipeEventCollector.java   | 19 +--------
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  8 +---
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  8 +---
 11 files changed, 35 insertions(+), 132 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index 00f3b80424a..2522fdc66f6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -61,7 +61,7 @@ public class DoNothingConnector implements PipeConnector {
   }
 
   @Override
-  public void transfer(DeletionEvent deletionEvent) {
+  public void transfer(Event event) {
     // do nothing
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index e252e5be726..82ddd05ba3c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -67,7 +67,7 @@ public class IoTDBThriftConnector implements PipeConnector {
   }
 
   @Override
-  public void transfer(DeletionEvent deletionEvent) {
+  public void transfer(Event event) {
     throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index 6a18f9a64e8..bc56a8bb3cf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -46,19 +46,18 @@ public class DoNothingProcessor implements PipeProcessor {
   @Override
   public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
       throws IOException {
-    eventCollector.collectTabletInsertionEvent(tabletInsertionEvent);
+    eventCollector.collect(tabletInsertionEvent);
   }
 
   @Override
   public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
       throws IOException {
-    eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent);
+    eventCollector.collect(tsFileInsertionEvent);
   }
 
   @Override
-  public void process(DeletionEvent deletionEvent, EventCollector eventCollector)
-      throws IOException {
-    eventCollector.collectDeletionEvent(deletionEvent);
+  public void process(Event event, EventCollector eventCollector) throws IOException {
+    eventCollector.collect(event);
   }
 
   @Override
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 5502de8bb37..6d74847e763 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.pipe.api;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -52,7 +52,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
  *             following 3 methods will be called: {@link
  *             PipeConnector#transfer(TabletInsertionEvent)}, {@link
  *             PipeConnector#transfer(TsFileInsertionEvent)} and {@link
- *             PipeConnector#transfer(DeletionEvent)}.
+ *             PipeConnector#transfer(Event)}.
  *       </ul>
  *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
  *       PipeConnector#close() } method will be called.
@@ -130,11 +130,11 @@ public interface PipeConnector extends PipePlugin {
   void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception;
 
   /**
-   * This method is used to transfer the DeletionEvent.
+   * This method is used to transfer the Event.
    *
-   * @param deletionEvent DeletionEvent to be transferred
+   * @param event Event to be transferred
    * @throws PipeConnectionException if the connection is broken
    * @throws Exception the user can throw errors if necessary
    */
-  void transfer(DeletionEvent deletionEvent) throws Exception;
+  void transfer(Event event) throws Exception;
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index 16a5e81ba6c..e94384d5f23 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -48,7 +48,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
  *             following 3 methods will be called: {@link
  *             PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
  *             PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
- *             PipeProcessor#process(DeletionEvent, EventCollector)}.
+ *             PipeProcessor#process(Event, EventCollector)}.
  *         <li>PipeConnector serializes the events into binaries and send them to sinks.
  *       </ul>
  *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
@@ -107,11 +107,11 @@ public interface PipeProcessor extends PipePlugin {
       throws Exception;
 
   /**
-   * This method is called to process the DeletionEvent.
+   * This method is called to process the Event.
    *
-   * @param deletionEvent DeletionEvent to be processed
+   * @param event Event to be processed
    * @param eventCollector used to collect result events after processing
    * @throws Exception the user can throw errors if necessary
    */
-  void process(DeletionEvent deletionEvent, EventCollector eventCollector) throws Exception;
+  void process(Event event, EventCollector eventCollector) throws Exception;
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
index 2e53693d65b..a9ef1f0aa62 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
@@ -19,44 +19,21 @@
 
 package org.apache.iotdb.pipe.api.collector;
 
-import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 
 import java.io.IOException;
 
-/**
- * Used to collect events generated by {@link PipeProcessor#process(TabletInsertionEvent,
- * EventCollector)}, {@link PipeProcessor#process(TsFileInsertionEvent, EventCollector)} or {@link
- * PipeProcessor#process(DeletionEvent, EventCollector)}.
- */
+/** Used to collect events in pipe engine. */
 public interface EventCollector {
 
   /**
-   * Collects an insertion event in form of TabletInsertionEvent.
-   *
-   * @param event TabletInsertionEvent to be collected
-   * @throws IOException if any I/O errors occur
-   * @see TabletInsertionEvent
-   */
-  void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException;
-
-  /**
-   * Collects an insertion event in form of TsFileInsertionEvent.
-   *
-   * @param event TsFileInsertionEvent to be collected
-   * @throws IOException if any I/O errors occur
-   * @see TsFileInsertionEvent
-   */
-  void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException;
-
-  /**
-   * Collects a deletion event.
+   * Collects a Event in pipe engine.
    *
-   * @param event DeletionEvent to be collected
+   * @param event Event to be collected
    * @throws IOException if any I/O errors occur
-   * @see DeletionEvent
+   * @see Event
+   * @see org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent
+   * @see org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent
    */
-  void collectDeletionEvent(DeletionEvent event) throws IOException;
+  void collect(Event event) throws IOException;
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
deleted file mode 100644
index d1fb966379c..00000000000
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.event.dml.deletion;
-
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-
-/** DeletionEvent is used to define the event of deletion. */
-public interface DeletionEvent extends Event {
-
-  /**
-   * The method is used to get the path pattern of the deleted data.
-   *
-   * @return String
-   */
-  Path getPath();
-
-  /**
-   * The method is used to get the time range of the deleted data.
-   *
-   * @return TimeRange
-   */
-  TimeRange getTimeRange();
-
-  @Override
-  default EventType getType() {
-    return EventType.DELETION;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 32408769579..6b414c03b36 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -218,8 +218,8 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
   }
 
   @Override
-  public void transfer(DeletionEvent deletionEvent) throws Exception {
-    throw new NotImplementedException("Not implement for deletion event.");
+  public void transfer(Event event) {
+    LOGGER.warn("IoTDBThriftConnectorV1 does not support transfer generic event: {}.", event);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index a1f443a9d43..c38b7c28a67 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -23,9 +23,6 @@ import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.util.LinkedList;
 import java.util.Queue;
@@ -48,21 +45,7 @@ public class PipeEventCollector implements EventCollector {
   }
 
   @Override
-  public void collectTabletInsertionEvent(TabletInsertionEvent event) {
-    collect(event);
-  }
-
-  @Override
-  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) {
-    collect(event);
-  }
-
-  @Override
-  public void collectDeletionEvent(DeletionEvent event) {
-    collect(event);
-  }
-
-  private synchronized void collect(Event event) {
+  public synchronized void collect(Event event) {
     if (event instanceof EnrichedEvent) {
       ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 80f366eb096..8cc0d2f4265 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -76,12 +75,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
         case TSFILE_INSERTION:
           outputPipeConnector.transfer((TsFileInsertionEvent) event);
           break;
-        case DELETION:
-          outputPipeConnector.transfer((DeletionEvent) event);
-          break;
         default:
-          throw new UnsupportedOperationException(
-              "Unsupported event type: " + event.getClass().getName());
+          outputPipeConnector.transfer(event);
+          break;
       }
 
       releaseLastEvent();
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index feb584fcaff..dad72977865 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -67,12 +66,9 @@ public class PipeProcessorSubtask extends PipeSubtask {
         case TSFILE_INSERTION:
           pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
           break;
-        case DELETION:
-          pipeProcessor.process((DeletionEvent) event, outputEventCollector);
-          break;
         default:
-          throw new UnsupportedOperationException(
-              "Unsupported event type: " + event.getClass().getName());
+          pipeProcessor.process(event, outputEventCollector);
+          break;
       }
 
       releaseLastEvent();