You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/22 14:50:22 UTC

[iotdb] branch pipe-task-schedule created (now 2b6cdaaebad)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 2b6cdaaebad rename package

This branch includes the following new commits:

     new c76809d6476 remove deletion event and introduce generic event to pipe engine
     new 414f0ffaa27 remove getType() method of Event
     new c3e16a7806f executeForAWhile -> executeOnce
     new 86f0e554bb6 safety start pipe tasks
     new 415a7ce4f0c pipe task scheduler
     new 2b6cdaaebad rename package

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/06: remove getType() method of Event

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 414f0ffaa2791efed3535517680810dd5f4e7a9d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 22 11:06:08 2023 +0800

    remove getType() method of Event
---
 .../org/apache/iotdb/pipe/api/event/Event.java     |  6 +--
 .../event/dml/insertion/TabletInsertionEvent.java  |  6 ---
 .../event/dml/insertion/TsFileInsertionEvent.java  |  6 ---
 .../PipeRealtimeDataRegionHybridCollector.java     | 49 +++++++++++-----------
 .../event/realtime/PipeRealtimeCollectEvent.java   |  6 ---
 .../db/pipe/task/subtask/PipeConnectorSubtask.java | 16 +++----
 .../db/pipe/task/subtask/PipeProcessorSubtask.java | 16 +++----
 .../core/collector/PipeRealtimeCollectTest.java    | 20 ++++-----
 8 files changed, 47 insertions(+), 78 deletions(-)

diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
index f5d8d2fbcd3..74ddf9e47d2 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
@@ -20,8 +20,4 @@
 package org.apache.iotdb.pipe.api.event;
 
 /** This interface is used to abstract events in collaboration tasks. */
-public interface Event {
-
-  /** @return the type of the event */
-  EventType getType();
-}
+public interface Event {}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index e353bf397e0..9fd6d89428c 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.pipe.api.event.dml.insertion;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import java.util.Iterator;
@@ -54,9 +53,4 @@ public interface TabletInsertionEvent extends Event {
    *     RowCollector
    */
   TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer);
-
-  @Override
-  default EventType getType() {
-    return EventType.TABLET_INSERTION;
-  }
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index 2d5badc8072..325f0683cb7 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.pipe.api.event.dml.insertion;
 
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
 
 /**
  * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
@@ -42,9 +41,4 @@ public interface TsFileInsertionEvent extends Event {
    * @return TsFileInsertionEvent
    */
   TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable);
-
-  @Override
-  default EventType getType() {
-    return EventType.TSFILE_INSERTION;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 67d461e8c8a..898b93cb22d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 
 import org.slf4j.Logger;
@@ -48,18 +50,17 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
 
   @Override
   public void collect(PipeRealtimeCollectEvent event) {
-    switch (event.getEvent().getType()) {
-      case TABLET_INSERTION:
-        collectTabletInsertion(event);
-        break;
-      case TSFILE_INSERTION:
-        collectTsFileInsertion(event);
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            String.format(
-                "Unsupported event type %s for Hybrid Realtime Collector %s",
-                event.getEvent().getType(), this));
+    final Event eventToCollect = event.getEvent();
+
+    if (eventToCollect instanceof TabletInsertionEvent) {
+      collectTabletInsertion(event);
+    } else if (eventToCollect instanceof TsFileInsertionEvent) {
+      collectTsFileInsertion(event);
+    } else {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Unsupported event type %s for Hybrid Realtime Collector %s",
+              eventToCollect.getClass(), this));
     }
   }
 
@@ -107,18 +108,18 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
 
     while (collectEvent != null) {
       Event suppliedEvent;
-      switch (collectEvent.getEvent().getType()) {
-        case TABLET_INSERTION:
-          suppliedEvent = supplyTabletInsertion(collectEvent);
-          break;
-        case TSFILE_INSERTION:
-          suppliedEvent = supplyTsFileInsertion(collectEvent);
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              String.format(
-                  "Unsupported event type %s for Hybrid Realtime Collector %s",
-                  collectEvent.getEvent().getType(), this));
+
+      // used to judge type of event, not directly for supplying.
+      final Event eventToSupply = collectEvent.getEvent();
+      if (eventToSupply instanceof TabletInsertionEvent) {
+        suppliedEvent = supplyTabletInsertion(collectEvent);
+      } else if (eventToSupply instanceof TsFileInsertionEvent) {
+        suppliedEvent = supplyTsFileInsertion(collectEvent);
+      } else {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unsupported event type %s for Hybrid Realtime Collector %s",
+                eventToSupply.getClass(), this));
       }
 
       collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 63fcc891361..0c63131e6e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
 
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
 
 import java.util.Map;
 
@@ -55,11 +54,6 @@ public class PipeRealtimeCollectEvent implements Event, EnrichedEvent {
     device2Measurements = null;
   }
 
-  @Override
-  public EventType getType() {
-    return event.getType();
-  }
-
   @Override
   public boolean increaseReferenceCount(String holderMessage) {
     return !(event instanceof EnrichedEvent)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 8cc0d2f4265..1ec0994ecdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -68,16 +68,12 @@ public class PipeConnectorSubtask extends PipeSubtask {
     }
 
     try {
-      switch (event.getType()) {
-        case TABLET_INSERTION:
-          outputPipeConnector.transfer((TabletInsertionEvent) event);
-          break;
-        case TSFILE_INSERTION:
-          outputPipeConnector.transfer((TsFileInsertionEvent) event);
-          break;
-        default:
-          outputPipeConnector.transfer(event);
-          break;
+      if (event instanceof TabletInsertionEvent) {
+        outputPipeConnector.transfer((TabletInsertionEvent) event);
+      } else if (event instanceof TsFileInsertionEvent) {
+        outputPipeConnector.transfer((TsFileInsertionEvent) event);
+      } else {
+        outputPipeConnector.transfer(event);
       }
 
       releaseLastEvent();
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index dad72977865..0e65894c14d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -59,16 +59,12 @@ public class PipeProcessorSubtask extends PipeSubtask {
     }
 
     try {
-      switch (event.getType()) {
-        case TABLET_INSERTION:
-          pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
-          break;
-        case TSFILE_INSERTION:
-          pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
-          break;
-        default:
-          pipeProcessor.process(event, outputEventCollector);
-          break;
+      if (event instanceof TabletInsertionEvent) {
+        pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
+      } else if (event instanceof TsFileInsertionEvent) {
+        pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
+      } else {
+        pipeProcessor.process(event, outputEventCollector);
       }
 
       releaseLastEvent();
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index a2c3508b294..7cd705af588 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDa
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
 import org.junit.After;
@@ -163,9 +163,9 @@ public class PipeRealtimeCollectTest {
           Arrays.asList(
               listen(
                   collectors[0],
-                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[1], typ2 -> 1, writeNum));
+              listen(collectors[1], event -> 1, writeNum));
 
       try {
         listenFutures.get(0).get(10, TimeUnit.MINUTES);
@@ -199,14 +199,14 @@ public class PipeRealtimeCollectTest {
           Arrays.asList(
               listen(
                   collectors[0],
-                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[1], typ2 -> 1, writeNum),
+              listen(collectors[1], event -> 1, writeNum),
               listen(
                   collectors[2],
-                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[3], typ2 -> 1, writeNum));
+              listen(collectors[3], event -> 1, writeNum));
       try {
         listenFutures.get(0).get(10, TimeUnit.MINUTES);
         listenFutures.get(1).get(10, TimeUnit.MINUTES);
@@ -278,9 +278,7 @@ public class PipeRealtimeCollectTest {
   }
 
   private Future<?> listen(
-      PipeRealtimeDataRegionCollector collector,
-      Function<EventType, Integer> weight,
-      int expectNum) {
+      PipeRealtimeDataRegionCollector collector, Function<Event, Integer> weight, int expectNum) {
     return listenerService.submit(
         () -> {
           int eventNum = 0;
@@ -293,7 +291,7 @@ public class PipeRealtimeCollectTest {
                 throw new RuntimeException(e);
               }
               if (event != null) {
-                eventNum += weight.apply(event.getType());
+                eventNum += weight.apply(event);
               }
             }
           } finally {


[iotdb] 05/06: pipe task scheduler

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 415a7ce4f0c030f6cd0a4a476ba27e629b7747e1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 22 22:39:02 2023 +0800

    pipe task scheduler
---
 .../execution/executor/PipeSubtaskExecutor.java    | 24 ++++--
 .../execution/scheduler/PipeSubtaskScheduler.java  | 89 ++++++++++++++++++++++
 .../execution/scheduler/PipeTaskScheduler.java     | 74 ------------------
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    | 26 ++++++-
 .../executor/PipeConnectorSubtaskExecutorTest.java |  3 +-
 .../executor/PipeProcessorSubtaskExecutorTest.java |  3 +-
 .../executor/PipeSubtaskExecutorTest.java          |  3 +-
 7 files changed, 133 insertions(+), 89 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 762561546b3..d1befd3e08a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.execution.executor;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -44,7 +45,8 @@ public abstract class PipeSubtaskExecutor {
 
   private final Map<String, PipeSubtask> registeredIdSubtaskMapper;
 
-  private int corePoolSize;
+  private final int corePoolSize;
+  private int runningSubtaskNumber;
 
   protected PipeSubtaskExecutor(int corePoolSize, ThreadName threadName) {
     subtaskWorkerThreadPoolExecutor =
@@ -54,6 +56,7 @@ public abstract class PipeSubtaskExecutor {
     registeredIdSubtaskMapper = new ConcurrentHashMap<>();
 
     this.corePoolSize = corePoolSize;
+    runningSubtaskNumber = 0;
   }
 
   /////////////////////// subtask management ///////////////////////
@@ -65,7 +68,10 @@ public abstract class PipeSubtaskExecutor {
     }
 
     registeredIdSubtaskMapper.put(subtask.getTaskID(), subtask);
-    subtask.bindExecutors(subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor);
+    subtask.bindExecutors(
+        subtaskWorkerThreadPoolExecutor,
+        subtaskCallbackListeningExecutor,
+        new PipeSubtaskScheduler(this));
   }
 
   public final synchronized void start(String subTaskID) {
@@ -82,6 +88,7 @@ public abstract class PipeSubtaskExecutor {
     } else {
       subtask.allowSubmittingSelf();
       subtask.submitSelf();
+      ++runningSubtaskNumber;
       LOGGER.info("The subtask {} is started to submit self.", subTaskID);
     }
   }
@@ -92,7 +99,9 @@ public abstract class PipeSubtaskExecutor {
       return;
     }
 
-    registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf();
+    if (registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf()) {
+      --runningSubtaskNumber;
+    }
   }
 
   public final synchronized void deregister(String subTaskID) {
@@ -138,12 +147,11 @@ public abstract class PipeSubtaskExecutor {
     return subtaskWorkerThreadPoolExecutor.isShutdown();
   }
 
-  public final void adjustExecutorThreadNumber(int threadNum) {
-    corePoolSize = threadNum;
-    throw new UnsupportedOperationException("Not implemented yet.");
+  public final int getCorePoolSize() {
+    return corePoolSize;
   }
 
-  public final int getExecutorThreadNumber() {
-    return corePoolSize;
+  public final int getRunningSubtaskNumber() {
+    return runningSubtaskNumber;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
new file mode 100644
index 00000000000..dada354e743
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+
+public class PipeSubtaskScheduler {
+
+  private final PipeSubtaskExecutor executor;
+
+  private boolean isFirstSchedule = true;
+
+  // TODO: make these two configurable
+
+  private static final int BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT = 10_000;
+  private int consumedEventCountCheckpointInterval;
+  private int consumedEventCount;
+
+  // in ms
+  private static final long BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION = 10 * 1000L;
+  private long timeDurationCheckpointInterval;
+  private long lastCheckTime;
+
+  public PipeSubtaskScheduler(PipeSubtaskExecutor executor) {
+    this.executor = executor;
+  }
+
+  public boolean schedule() {
+    if (isFirstSchedule) {
+      isFirstSchedule = false;
+
+      adjustCheckpointIntervalBasedOnExecutorStatus();
+
+      ++consumedEventCount;
+      return true;
+    }
+
+    if (consumedEventCount < consumedEventCountCheckpointInterval
+        && System.currentTimeMillis() - lastCheckTime < timeDurationCheckpointInterval) {
+      ++consumedEventCount;
+      return true;
+    }
+
+    return false;
+  }
+
+  private void adjustCheckpointIntervalBasedOnExecutorStatus() {
+    // 1. reset consumedEventCount and lastCheckTime
+    consumedEventCount = 0;
+    lastCheckTime = System.currentTimeMillis();
+
+    // 2. adjust checkpoint interval
+    final int corePoolSize = Math.max(1, executor.getCorePoolSize());
+    final int runningSubtaskNumber = Math.max(1, executor.getRunningSubtaskNumber());
+    consumedEventCountCheckpointInterval =
+        Math.max(
+            1,
+            (int)
+                (((float) BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT / runningSubtaskNumber)
+                    * corePoolSize));
+    timeDurationCheckpointInterval =
+        Math.max(
+            1,
+            (long)
+                (((float) BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION / runningSubtaskNumber)
+                    * corePoolSize));
+  }
+
+  public void reset() {
+    isFirstSchedule = true;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
deleted file mode 100644
index 4f035ca6715..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.execution.scheduler;
-
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-
-/**
- * PipeTaskScheduler is a singleton class that manages the numbers of threads used by
- * PipeTaskExecutors dynamically.
- */
-public class PipeTaskScheduler {
-
-  private final PipeSubtaskExecutorManager pipeSubtaskExecutorManager =
-      PipeSubtaskExecutorManager.getInstance();
-
-  public void adjustAssignerSubtaskExecutorThreadNum(int threadNum) {
-    // TODO: make it configurable by setting different parameters
-    pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
-  }
-
-  public int getAssignerSubtaskExecutorThreadNum() {
-    return pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
-  }
-
-  public void adjustConnectorSubtaskExecutorThreadNum(int threadNum) {
-    // TODO: make it configurable by setting different parameters
-    pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
-  }
-
-  public int getConnectorSubtaskExecutorThreadNum() {
-    return pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
-  }
-
-  public void adjustProcessorSubtaskExecutorThreadNum(int threadNum) {
-    // TODO: make it configurable by setting different parameters
-    pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
-  }
-
-  public int getProcessorSubtaskExecutorThreadNum() {
-    return pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
-  }
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskScheduler() {}
-
-  private static class PipeTaskSchedulerHolder {
-    private static PipeTaskScheduler instance = null;
-  }
-
-  public static PipeTaskScheduler setupAndGetInstance() {
-    if (PipeTaskSchedulerHolder.instance == null) {
-      PipeTaskSchedulerHolder.instance = new PipeTaskScheduler();
-    }
-    return PipeTaskSchedulerHolder.instance;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index bed57c90e88..5ddff7b2f29 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -47,6 +48,8 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
   private final DecoratingLock callbackDecoratingLock = new DecoratingLock();
   private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
 
+  private PipeSubtaskScheduler subtaskScheduler;
+
   protected static final int MAX_RETRY_TIMES = 5;
   private final AtomicInteger retryCount = new AtomicInteger(0);
   protected Throwable lastFailedCause;
@@ -60,14 +63,25 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
 
   public void bindExecutors(
       ListeningExecutorService subtaskWorkerThreadPoolExecutor,
-      ExecutorService subtaskCallbackListeningExecutor) {
+      ExecutorService subtaskCallbackListeningExecutor,
+      PipeSubtaskScheduler subtaskScheduler) {
     this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
     this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
+    this.subtaskScheduler = subtaskScheduler;
   }
 
   @Override
   public Void call() throws Exception {
-    executeOnce();
+    // if the scheduler allows to schedule, then try to consume an event
+    while (subtaskScheduler.schedule()) {
+      // if the event is consumed successfully, then continue to consume the next event
+      // otherwise, stop consuming
+      if (!executeOnce()) {
+        break;
+      }
+    }
+    // reset the scheduler to make sure that the scheduler can schedule again
+    subtaskScheduler.reset();
 
     // wait for the callable to be decorated by Futures.addCallback in the executorService
     // to make sure that the callback can be submitted again on success or failure.
@@ -131,8 +145,12 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
     shouldStopSubmittingSelf.set(false);
   }
 
-  public void disallowSubmittingSelf() {
-    shouldStopSubmittingSelf.set(true);
+  /**
+   * @return true if the shouldStopSubmittingSelf state is changed from false to true, false
+   *     otherwise
+   */
+  public boolean disallowSubmittingSelf() {
+    return !shouldStopSubmittingSelf.getAndSet(true);
   }
 
   public boolean isSubmittingSelf() {
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java
similarity index 92%
rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java
index 52acbe9c2ff..4c572438955 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java
@@ -17,8 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.executor;
 
+import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java
similarity index 92%
rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java
index d0a5208d537..a8a8659f7fa 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java
@@ -17,8 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.executor;
 
+import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java
similarity index 97%
rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java
index d24efc3fb7f..f70d6874211 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java
@@ -17,8 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.executor;
 
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 
 import org.junit.After;


[iotdb] 01/06: remove deletion event and introduce generic event to pipe engine

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c76809d64763f5dc91d41aedd21bb88279c210b9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 22 10:44:29 2023 +0800

    remove deletion event and introduce generic event to pipe engine
---
 .../builtin/connector/DoNothingConnector.java      |  4 +-
 .../builtin/connector/IoTDBThriftConnector.java    |  4 +-
 .../builtin/processor/DoNothingProcessor.java      | 11 +++--
 .../org/apache/iotdb/pipe/api/PipeConnector.java   | 10 ++---
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   | 10 ++---
 .../iotdb/pipe/api/collector/EventCollector.java   | 39 ++++--------------
 .../pipe/api/event/dml/deletion/DeletionEvent.java | 48 ----------------------
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |  6 +--
 .../event/view/collector/PipeEventCollector.java   | 19 +--------
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  8 +---
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  8 +---
 11 files changed, 35 insertions(+), 132 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index 00f3b80424a..2522fdc66f6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -61,7 +61,7 @@ public class DoNothingConnector implements PipeConnector {
   }
 
   @Override
-  public void transfer(DeletionEvent deletionEvent) {
+  public void transfer(Event event) {
     // do nothing
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index e252e5be726..82ddd05ba3c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -67,7 +67,7 @@ public class IoTDBThriftConnector implements PipeConnector {
   }
 
   @Override
-  public void transfer(DeletionEvent deletionEvent) {
+  public void transfer(Event event) {
     throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index 6a18f9a64e8..bc56a8bb3cf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -46,19 +46,18 @@ public class DoNothingProcessor implements PipeProcessor {
   @Override
   public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
       throws IOException {
-    eventCollector.collectTabletInsertionEvent(tabletInsertionEvent);
+    eventCollector.collect(tabletInsertionEvent);
   }
 
   @Override
   public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
       throws IOException {
-    eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent);
+    eventCollector.collect(tsFileInsertionEvent);
   }
 
   @Override
-  public void process(DeletionEvent deletionEvent, EventCollector eventCollector)
-      throws IOException {
-    eventCollector.collectDeletionEvent(deletionEvent);
+  public void process(Event event, EventCollector eventCollector) throws IOException {
+    eventCollector.collect(event);
   }
 
   @Override
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 5502de8bb37..6d74847e763 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.pipe.api;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -52,7 +52,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
  *             following 3 methods will be called: {@link
  *             PipeConnector#transfer(TabletInsertionEvent)}, {@link
  *             PipeConnector#transfer(TsFileInsertionEvent)} and {@link
- *             PipeConnector#transfer(DeletionEvent)}.
+ *             PipeConnector#transfer(Event)}.
  *       </ul>
  *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
  *       PipeConnector#close() } method will be called.
@@ -130,11 +130,11 @@ public interface PipeConnector extends PipePlugin {
   void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception;
 
   /**
-   * This method is used to transfer the DeletionEvent.
+   * This method is used to transfer the Event.
    *
-   * @param deletionEvent DeletionEvent to be transferred
+   * @param event Event to be transferred
    * @throws PipeConnectionException if the connection is broken
    * @throws Exception the user can throw errors if necessary
    */
-  void transfer(DeletionEvent deletionEvent) throws Exception;
+  void transfer(Event event) throws Exception;
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index 16a5e81ba6c..e94384d5f23 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -48,7 +48,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
  *             following 3 methods will be called: {@link
  *             PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
  *             PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
- *             PipeProcessor#process(DeletionEvent, EventCollector)}.
+ *             PipeProcessor#process(Event, EventCollector)}.
  *         <li>PipeConnector serializes the events into binaries and send them to sinks.
  *       </ul>
  *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
@@ -107,11 +107,11 @@ public interface PipeProcessor extends PipePlugin {
       throws Exception;
 
   /**
-   * This method is called to process the DeletionEvent.
+   * This method is called to process the Event.
    *
-   * @param deletionEvent DeletionEvent to be processed
+   * @param event Event to be processed
    * @param eventCollector used to collect result events after processing
    * @throws Exception the user can throw errors if necessary
    */
-  void process(DeletionEvent deletionEvent, EventCollector eventCollector) throws Exception;
+  void process(Event event, EventCollector eventCollector) throws Exception;
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
index 2e53693d65b..a9ef1f0aa62 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
@@ -19,44 +19,21 @@
 
 package org.apache.iotdb.pipe.api.collector;
 
-import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 
 import java.io.IOException;
 
-/**
- * Used to collect events generated by {@link PipeProcessor#process(TabletInsertionEvent,
- * EventCollector)}, {@link PipeProcessor#process(TsFileInsertionEvent, EventCollector)} or {@link
- * PipeProcessor#process(DeletionEvent, EventCollector)}.
- */
+/** Used to collect events in pipe engine. */
 public interface EventCollector {
 
   /**
-   * Collects an insertion event in form of TabletInsertionEvent.
-   *
-   * @param event TabletInsertionEvent to be collected
-   * @throws IOException if any I/O errors occur
-   * @see TabletInsertionEvent
-   */
-  void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException;
-
-  /**
-   * Collects an insertion event in form of TsFileInsertionEvent.
-   *
-   * @param event TsFileInsertionEvent to be collected
-   * @throws IOException if any I/O errors occur
-   * @see TsFileInsertionEvent
-   */
-  void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException;
-
-  /**
-   * Collects a deletion event.
+   * Collects a Event in pipe engine.
    *
-   * @param event DeletionEvent to be collected
+   * @param event Event to be collected
    * @throws IOException if any I/O errors occur
-   * @see DeletionEvent
+   * @see Event
+   * @see org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent
+   * @see org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent
    */
-  void collectDeletionEvent(DeletionEvent event) throws IOException;
+  void collect(Event event) throws IOException;
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
deleted file mode 100644
index d1fb966379c..00000000000
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.event.dml.deletion;
-
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-
-/** DeletionEvent is used to define the event of deletion. */
-public interface DeletionEvent extends Event {
-
-  /**
-   * The method is used to get the path pattern of the deleted data.
-   *
-   * @return String
-   */
-  Path getPath();
-
-  /**
-   * The method is used to get the time range of the deleted data.
-   *
-   * @return TimeRange
-   */
-  TimeRange getTimeRange();
-
-  @Override
-  default EventType getType() {
-    return EventType.DELETION;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 32408769579..6b414c03b36 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -218,8 +218,8 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
   }
 
   @Override
-  public void transfer(DeletionEvent deletionEvent) throws Exception {
-    throw new NotImplementedException("Not implement for deletion event.");
+  public void transfer(Event event) {
+    LOGGER.warn("IoTDBThriftConnectorV1 does not support transfer generic event: {}.", event);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index a1f443a9d43..c38b7c28a67 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -23,9 +23,6 @@ import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.util.LinkedList;
 import java.util.Queue;
@@ -48,21 +45,7 @@ public class PipeEventCollector implements EventCollector {
   }
 
   @Override
-  public void collectTabletInsertionEvent(TabletInsertionEvent event) {
-    collect(event);
-  }
-
-  @Override
-  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) {
-    collect(event);
-  }
-
-  @Override
-  public void collectDeletionEvent(DeletionEvent event) {
-    collect(event);
-  }
-
-  private synchronized void collect(Event event) {
+  public synchronized void collect(Event event) {
     if (event instanceof EnrichedEvent) {
       ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 80f366eb096..8cc0d2f4265 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -76,12 +75,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
         case TSFILE_INSERTION:
           outputPipeConnector.transfer((TsFileInsertionEvent) event);
           break;
-        case DELETION:
-          outputPipeConnector.transfer((DeletionEvent) event);
-          break;
         default:
-          throw new UnsupportedOperationException(
-              "Unsupported event type: " + event.getClass().getName());
+          outputPipeConnector.transfer(event);
+          break;
       }
 
       releaseLastEvent();
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index feb584fcaff..dad72977865 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -67,12 +66,9 @@ public class PipeProcessorSubtask extends PipeSubtask {
         case TSFILE_INSERTION:
           pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
           break;
-        case DELETION:
-          pipeProcessor.process((DeletionEvent) event, outputEventCollector);
-          break;
         default:
-          throw new UnsupportedOperationException(
-              "Unsupported event type: " + event.getClass().getName());
+          pipeProcessor.process(event, outputEventCollector);
+          break;
       }
 
       releaseLastEvent();


[iotdb] 03/06: executeForAWhile -> executeOnce

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c3e16a7806f1aaf0de9521dc6bce1348faab0a20
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 22 19:24:42 2023 +0800

    executeForAWhile -> executeOnce
---
 .../iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java       |  6 ++++--
 .../iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java       |  6 ++++--
 .../org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java     | 10 ++++++++--
 3 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 1ec0994ecdc..1bd8a6ed515 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -51,7 +51,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
   // TODO: for a while
   @Override
-  protected synchronized void executeForAWhile() {
+  protected synchronized boolean executeOnce() {
     try {
       // TODO: reduce the frequency of heartbeat
       outputPipeConnector.heartbeat();
@@ -64,7 +64,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
     // record this event for retrying on connection failure or other exceptions
     lastEvent = event;
     if (event == null) {
-      return;
+      return false;
     }
 
     try {
@@ -85,6 +85,8 @@ public class PipeConnectorSubtask extends PipeSubtask {
           "Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.",
           e);
     }
+
+    return true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 0e65894c14d..6a76beb02b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -50,12 +50,12 @@ public class PipeProcessorSubtask extends PipeSubtask {
   }
 
   @Override
-  protected synchronized void executeForAWhile() throws Exception {
+  protected synchronized boolean executeOnce() throws Exception {
     final Event event = lastEvent != null ? lastEvent : inputEventSupplier.supply();
     // record the last event for retry when exception occurs
     lastEvent = event;
     if (event == null) {
-      return;
+      return false;
     }
 
     try {
@@ -74,6 +74,8 @@ public class PipeProcessorSubtask extends PipeSubtask {
           "Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.",
           e);
     }
+
+    return true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index c770b983b9a..bed57c90e88 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -67,7 +67,7 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
 
   @Override
   public Void call() throws Exception {
-    executeForAWhile();
+    executeOnce();
 
     // wait for the callable to be decorated by Futures.addCallback in the executorService
     // to make sure that the callback can be submitted again on success or failure.
@@ -76,7 +76,13 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
     return null;
   }
 
-  protected abstract void executeForAWhile() throws Exception;
+  /**
+   * try to consume an event by the pipe plugin.
+   *
+   * @return true if the event is consumed successfully, false if no more event can be consumed
+   * @throws Exception if any error occurs when consuming the event
+   */
+  protected abstract boolean executeOnce() throws Exception;
 
   @Override
   public void onSuccess(Void result) {


[iotdb] 04/06: safety start pipe tasks

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 86f0e554bb6b3c17f4ddccaafdafa21379dc7b26
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 22 19:33:53 2023 +0800

    safety start pipe tasks
---
 .../statemachine/ConfigRegionStateMachine.java      |  6 ++++--
 .../manager/pipe/runtime/PipeMetaSyncer.java        | 21 ++++++++++++++++++---
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 53e7f9ae688..4964ba79774 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -64,7 +64,7 @@ public class ConfigRegionStateMachine
   private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionStateMachine.class);
 
   private static final ExecutorService threadPool =
-      IoTDBThreadPoolFactory.newCachedThreadPool("CQ-recovery");
+      IoTDBThreadPoolFactory.newCachedThreadPool("ConfigNode-Manager-Recovery");
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
   private final ConfigPlanExecutor executor;
   private ConfigManager configManager;
@@ -218,7 +218,9 @@ public class ConfigRegionStateMachine
       // 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
       // initialized after notifyLeaderChanged finished
       threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
-      configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync();
+
+      threadPool.submit(
+          () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
     } else {
       LOGGER.info(
           "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index ba9bba61c6a..4d6ebb04ff0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -40,7 +40,8 @@ public class PipeMetaSyncer {
   private static final ScheduledExecutorService SYNC_EXECUTOR =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
           ThreadName.PIPE_META_SYNC_SERVICE.getName());
-  // TODO: make this configurable
+  // TODO: make these configurable
+  private static final long INITIAL_SYNC_DELAY_MINUTES = 3;
   private static final long SYNC_INTERVAL_MINUTES = 3;
 
   private final ConfigManager configManager;
@@ -52,15 +53,29 @@ public class PipeMetaSyncer {
   }
 
   public synchronized void start() {
+    while (configManager.getConsensusManager() == null) {
+      try {
+        LOGGER.info("consensus layer is not ready, sleep 1s...");
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("unexpected interruption during waiting for consensus layer ready.");
+      }
+    }
+
     if (metaSyncFuture == null) {
       metaSyncFuture =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-              SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES);
+              SYNC_EXECUTOR,
+              this::sync,
+              INITIAL_SYNC_DELAY_MINUTES,
+              SYNC_INTERVAL_MINUTES,
+              TimeUnit.MINUTES);
       LOGGER.info("PipeMetaSyncer is started successfully.");
     }
   }
 
-  private void sync() {
+  private synchronized void sync() {
     final TSStatus status = configManager.getProcedureManager().pipeMetaSync();
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn(


[iotdb] 06/06: rename package

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2b6cdaaebada5b89196339ec6401885f6b70f932
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 22 22:46:18 2023 +0800

    rename package
---
 .../org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java    | 1 -
 .../{ => execution}/executor/PipeConnectorSubtaskExecutorTest.java     | 3 +--
 .../{ => execution}/executor/PipeProcessorSubtaskExecutorTest.java     | 3 +--
 .../db/pipe/{ => execution}/executor/PipeSubtaskExecutorTest.java      | 3 +--
 4 files changed, 3 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 1bd8a6ed515..742b2230fa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -49,7 +49,6 @@ public class PipeConnectorSubtask extends PipeSubtask {
     this.outputPipeConnector = outputPipeConnector;
   }
 
-  // TODO: for a while
   @Override
   protected synchronized boolean executeOnce() {
     try {
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
similarity index 92%
rename from server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 4c572438955..52acbe9c2ff 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.executor;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
similarity index 92%
rename from server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index a8a8659f7fa..d0a5208d537 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.executor;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
similarity index 97%
rename from server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
index f70d6874211..d24efc3fb7f 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.executor;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 
 import org.junit.After;