You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/22 14:50:27 UTC

[iotdb] 05/06: pipe task scheduler

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 415a7ce4f0c030f6cd0a4a476ba27e629b7747e1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 22 22:39:02 2023 +0800

    pipe task scheduler
---
 .../execution/executor/PipeSubtaskExecutor.java    | 24 ++++--
 .../execution/scheduler/PipeSubtaskScheduler.java  | 89 ++++++++++++++++++++++
 .../execution/scheduler/PipeTaskScheduler.java     | 74 ------------------
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    | 26 ++++++-
 .../executor/PipeConnectorSubtaskExecutorTest.java |  3 +-
 .../executor/PipeProcessorSubtaskExecutorTest.java |  3 +-
 .../executor/PipeSubtaskExecutorTest.java          |  3 +-
 7 files changed, 133 insertions(+), 89 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 762561546b3..d1befd3e08a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.execution.executor;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -44,7 +45,8 @@ public abstract class PipeSubtaskExecutor {
 
   private final Map<String, PipeSubtask> registeredIdSubtaskMapper;
 
-  private int corePoolSize;
+  private final int corePoolSize;
+  private int runningSubtaskNumber;
 
   protected PipeSubtaskExecutor(int corePoolSize, ThreadName threadName) {
     subtaskWorkerThreadPoolExecutor =
@@ -54,6 +56,7 @@ public abstract class PipeSubtaskExecutor {
     registeredIdSubtaskMapper = new ConcurrentHashMap<>();
 
     this.corePoolSize = corePoolSize;
+    runningSubtaskNumber = 0;
   }
 
   /////////////////////// subtask management ///////////////////////
@@ -65,7 +68,10 @@ public abstract class PipeSubtaskExecutor {
     }
 
     registeredIdSubtaskMapper.put(subtask.getTaskID(), subtask);
-    subtask.bindExecutors(subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor);
+    subtask.bindExecutors(
+        subtaskWorkerThreadPoolExecutor,
+        subtaskCallbackListeningExecutor,
+        new PipeSubtaskScheduler(this));
   }
 
   public final synchronized void start(String subTaskID) {
@@ -82,6 +88,7 @@ public abstract class PipeSubtaskExecutor {
     } else {
       subtask.allowSubmittingSelf();
       subtask.submitSelf();
+      ++runningSubtaskNumber;
       LOGGER.info("The subtask {} is started to submit self.", subTaskID);
     }
   }
@@ -92,7 +99,9 @@ public abstract class PipeSubtaskExecutor {
       return;
     }
 
-    registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf();
+    if (registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf()) {
+      --runningSubtaskNumber;
+    }
   }
 
   public final synchronized void deregister(String subTaskID) {
@@ -138,12 +147,11 @@ public abstract class PipeSubtaskExecutor {
     return subtaskWorkerThreadPoolExecutor.isShutdown();
   }
 
-  public final void adjustExecutorThreadNumber(int threadNum) {
-    corePoolSize = threadNum;
-    throw new UnsupportedOperationException("Not implemented yet.");
+  public final int getCorePoolSize() {
+    return corePoolSize;
   }
 
-  public final int getExecutorThreadNumber() {
-    return corePoolSize;
+  public final int getRunningSubtaskNumber() {
+    return runningSubtaskNumber;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
new file mode 100644
index 00000000000..dada354e743
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+
+public class PipeSubtaskScheduler {
+
+  private final PipeSubtaskExecutor executor;
+
+  private boolean isFirstSchedule = true;
+
+  // TODO: make these two configurable
+
+  private static final int BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT = 10_000;
+  private int consumedEventCountCheckpointInterval;
+  private int consumedEventCount;
+
+  // in ms
+  private static final long BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION = 10 * 1000L;
+  private long timeDurationCheckpointInterval;
+  private long lastCheckTime;
+
+  public PipeSubtaskScheduler(PipeSubtaskExecutor executor) {
+    this.executor = executor;
+  }
+
+  public boolean schedule() {
+    if (isFirstSchedule) {
+      isFirstSchedule = false;
+
+      adjustCheckpointIntervalBasedOnExecutorStatus();
+
+      ++consumedEventCount;
+      return true;
+    }
+
+    if (consumedEventCount < consumedEventCountCheckpointInterval
+        && System.currentTimeMillis() - lastCheckTime < timeDurationCheckpointInterval) {
+      ++consumedEventCount;
+      return true;
+    }
+
+    return false;
+  }
+
+  private void adjustCheckpointIntervalBasedOnExecutorStatus() {
+    // 1. reset consumedEventCount and lastCheckTime
+    consumedEventCount = 0;
+    lastCheckTime = System.currentTimeMillis();
+
+    // 2. adjust checkpoint interval
+    final int corePoolSize = Math.max(1, executor.getCorePoolSize());
+    final int runningSubtaskNumber = Math.max(1, executor.getRunningSubtaskNumber());
+    consumedEventCountCheckpointInterval =
+        Math.max(
+            1,
+            (int)
+                (((float) BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT / runningSubtaskNumber)
+                    * corePoolSize));
+    timeDurationCheckpointInterval =
+        Math.max(
+            1,
+            (long)
+                (((float) BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION / runningSubtaskNumber)
+                    * corePoolSize));
+  }
+
+  public void reset() {
+    isFirstSchedule = true;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
deleted file mode 100644
index 4f035ca6715..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.execution.scheduler;
-
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-
-/**
- * PipeTaskScheduler is a singleton class that manages the numbers of threads used by
- * PipeTaskExecutors dynamically.
- */
-public class PipeTaskScheduler {
-
-  private final PipeSubtaskExecutorManager pipeSubtaskExecutorManager =
-      PipeSubtaskExecutorManager.getInstance();
-
-  public void adjustAssignerSubtaskExecutorThreadNum(int threadNum) {
-    // TODO: make it configurable by setting different parameters
-    pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
-  }
-
-  public int getAssignerSubtaskExecutorThreadNum() {
-    return pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
-  }
-
-  public void adjustConnectorSubtaskExecutorThreadNum(int threadNum) {
-    // TODO: make it configurable by setting different parameters
-    pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
-  }
-
-  public int getConnectorSubtaskExecutorThreadNum() {
-    return pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
-  }
-
-  public void adjustProcessorSubtaskExecutorThreadNum(int threadNum) {
-    // TODO: make it configurable by setting different parameters
-    pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
-  }
-
-  public int getProcessorSubtaskExecutorThreadNum() {
-    return pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
-  }
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskScheduler() {}
-
-  private static class PipeTaskSchedulerHolder {
-    private static PipeTaskScheduler instance = null;
-  }
-
-  public static PipeTaskScheduler setupAndGetInstance() {
-    if (PipeTaskSchedulerHolder.instance == null) {
-      PipeTaskSchedulerHolder.instance = new PipeTaskScheduler();
-    }
-    return PipeTaskSchedulerHolder.instance;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index bed57c90e88..5ddff7b2f29 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -47,6 +48,8 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
   private final DecoratingLock callbackDecoratingLock = new DecoratingLock();
   private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
 
+  private PipeSubtaskScheduler subtaskScheduler;
+
   protected static final int MAX_RETRY_TIMES = 5;
   private final AtomicInteger retryCount = new AtomicInteger(0);
   protected Throwable lastFailedCause;
@@ -60,14 +63,25 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
 
   public void bindExecutors(
       ListeningExecutorService subtaskWorkerThreadPoolExecutor,
-      ExecutorService subtaskCallbackListeningExecutor) {
+      ExecutorService subtaskCallbackListeningExecutor,
+      PipeSubtaskScheduler subtaskScheduler) {
     this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
     this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
+    this.subtaskScheduler = subtaskScheduler;
   }
 
   @Override
   public Void call() throws Exception {
-    executeOnce();
+    // if the scheduler allows to schedule, then try to consume an event
+    while (subtaskScheduler.schedule()) {
+      // if the event is consumed successfully, then continue to consume the next event
+      // otherwise, stop consuming
+      if (!executeOnce()) {
+        break;
+      }
+    }
+    // reset the scheduler to make sure that the scheduler can schedule again
+    subtaskScheduler.reset();
 
     // wait for the callable to be decorated by Futures.addCallback in the executorService
     // to make sure that the callback can be submitted again on success or failure.
@@ -131,8 +145,12 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
     shouldStopSubmittingSelf.set(false);
   }
 
-  public void disallowSubmittingSelf() {
-    shouldStopSubmittingSelf.set(true);
+  /**
+   * @return true if the shouldStopSubmittingSelf state is changed from false to true, false
+   *     otherwise
+   */
+  public boolean disallowSubmittingSelf() {
+    return !shouldStopSubmittingSelf.getAndSet(true);
   }
 
   public boolean isSubmittingSelf() {
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java
similarity index 92%
rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java
index 52acbe9c2ff..4c572438955 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java
@@ -17,8 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.executor;
 
+import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java
similarity index 92%
rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java
index d0a5208d537..a8a8659f7fa 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java
@@ -17,8 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.executor;
 
+import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java
similarity index 97%
rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java
index d24efc3fb7f..f70d6874211 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java
@@ -17,8 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.executor;
 
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 
 import org.junit.After;