You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/06/30 08:43:28 UTC

[doris] branch master updated: [Feature](Job)Provide unified internal Job scheduling (#21113)

This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c3183f5eb [Feature](Job)Provide unified internal Job scheduling (#21113)
2c3183f5eb is described below

commit 2c3183f5eb6a82b01fd23dbf55ded65ff53a3ca9
Author: Calvin Kirs <ac...@163.com>
AuthorDate: Fri Jun 30 16:43:20 2023 +0800

    [Feature](Job)Provide unified internal Job scheduling (#21113)
    
    We use the time wheel algorithm to complete the scheduling and triggering of periodic tasks. The implementation of the time wheel algorithm refers to netty's HashedWheelTimer.
    We will periodically (10 minutes by default) put the events that need to be triggered in the future cycle into the time wheel for periodic scheduling. In order to ensure the efficient triggering of tasks and avoid task blocking and subsequent task scheduling delays, we use Disruptor to implement the production and consumption model.
    When the task expires and needs to be triggered, the task will be put into the RingBuffer of the Disruptor, and then the consumer thread will consume the task.
    Consumers need to register for events, and event registration needs to provide event executors. Event executors are a functional interface with only one method for executing events.
    If it is a single event, the event definition will be deleted after the scheduling is completed; if it is a periodic event, it will be put back into the time wheel according to the periodic scheduling after the scheduling is completed.
---
 .../apache/doris/scheduler/AsyncJobRegister.java   |  82 +++++++
 .../apache/doris/scheduler/JobRegisterFactory.java |  41 ++++
 .../doris/scheduler/constants/JobStatus.java       |  38 +++
 .../doris/scheduler/constants/SystemJob.java       |  42 ++++
 .../scheduler/disruptor/TimerTaskDisruptor.java    | 133 +++++++++++
 .../doris/scheduler/disruptor/TimerTaskEvent.java  |  38 +++
 .../disruptor/TimerTaskExpirationHandler.java      | 125 ++++++++++
 .../doris/scheduler/executor/JobExecutor.java      |  37 +++
 .../doris/scheduler/job/AsyncJobManager.java       | 262 +++++++++++++++++++++
 .../apache/doris/scheduler/job/DorisTimerTask.java |  58 +++++
 .../java/org/apache/doris/scheduler/job/Job.java   | 148 ++++++++++++
 .../doris/scheduler/registry/JobRegister.java      | 111 +++++++++
 .../scheduler/disruptor/AsyncJobManagerTest.java   | 117 +++++++++
 .../disruptor/TimerTaskDisruptorTest.java          |  77 ++++++
 fe/pom.xml                                         |   8 +-
 15 files changed, 1316 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java
new file mode 100644
index 0000000000..59c64906e8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler;
+
+import org.apache.doris.scheduler.executor.JobExecutor;
+import org.apache.doris.scheduler.job.AsyncJobManager;
+import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.scheduler.registry.JobRegister;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+/**
+ * This class registers timed scheduling events using the Netty time wheel algorithm to trigger events in a timely
+ * manner.
+ * After the event is triggered, it is produced by the Disruptor producer and consumed by the consumer, which is an
+ * asynchronous
+ * consumption model that does not guarantee strict timing accuracy.
+ */
+@Slf4j
+public class AsyncJobRegister implements JobRegister {
+
+    private final AsyncJobManager asyncJobManager;
+
+    public AsyncJobRegister() {
+        this.asyncJobManager = new AsyncJobManager();
+    }
+
+    @Override
+    public Long registerJob(String name, Long intervalMs, JobExecutor executor) {
+        return this.registerJob(name, intervalMs, null, null, executor);
+    }
+
+    @Override
+    public Long registerJob(String name, Long intervalMs, Long startTimeStamp, JobExecutor executor) {
+        return this.registerJob(name, intervalMs, startTimeStamp, null, executor);
+    }
+
+    @Override
+    public Long registerJob(String name, Long intervalMs, Long startTimeStamp, Long endTimeStamp,
+                                 JobExecutor executor) {
+
+        Job job = new Job(name, intervalMs, startTimeStamp, endTimeStamp, executor);
+        return asyncJobManager.registerJob(job);
+    }
+
+    @Override
+    public Boolean pauseJob(Long jobId) {
+        return asyncJobManager.pauseJob(jobId);
+    }
+
+    @Override
+    public Boolean stopJob(Long jobId) {
+        return asyncJobManager.stopJob(jobId);
+    }
+
+    @Override
+    public Boolean resumeJob(Long jobId) {
+        return asyncJobManager.resumeJob(jobId);
+    }
+
+    @Override
+    public void close() throws IOException {
+        asyncJobManager.close();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/JobRegisterFactory.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/JobRegisterFactory.java
new file mode 100644
index 0000000000..2613a0302c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/JobRegisterFactory.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler;
+
+import org.apache.doris.scheduler.registry.JobRegister;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class provides a factory for creating instances of {@link JobRegister}.
+ * The factory ensures that only one instance of the client is created in a lazy manner.
+ */
+public class JobRegisterFactory {
+    private static final AtomicReference<JobRegister> INSTANCE = new AtomicReference<>();
+
+    public static JobRegister getInstance() {
+        JobRegister instance = INSTANCE.get();
+        if (instance == null) {
+            instance = new AsyncJobRegister();
+            if (!INSTANCE.compareAndSet(null, instance)) {
+                instance = INSTANCE.get();
+            }
+        }
+        return instance;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
new file mode 100644
index 0000000000..5c4af0b649
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.constants;
+
+public enum JobStatus {
+
+    /**
+     * When the task is not started, the initial state will be triggered.
+     * The initial state can be started
+     */
+    RUNNING,
+    /**
+     * When the task execution encounters an exception or manually suspends the task,
+     * the pause state will be triggered.
+     * Pause state can be resumed
+     */
+    PAUSED,
+    /**
+     * When the task is manually stopped, the stop state will be triggered.
+     * The stop state cannot be resumed
+     */
+    STOPPED,
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java
new file mode 100644
index 0000000000..f24f6e4e19
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.constants;
+
+import lombok.Getter;
+
+/**
+ * System scheduler event job
+ * They will start when scheduler starts
+ */
+public enum SystemJob {
+
+    /**
+     * System cycle scheduler event job, it will start cycle scheduler
+     */
+    SYSTEM_SCHEDULER_JOB("system_scheduler_event_job", 1L);
+
+    @Getter
+    private final String description;
+    @Getter
+    private final Long id;
+
+    SystemJob(String description, Long id) {
+        this.description = description;
+        this.id = id;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java
new file mode 100644
index 0000000000..98a2736542
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import org.apache.doris.scheduler.job.AsyncJobManager;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventTranslatorTwoArg;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.WorkHandler;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a disruptor for processing event tasks consumed by a Disruptor.
+ *
+ * <p>The work handler retrieves the associated event job and executes it if it is running.
+ * If the event job is not running, the work handler logs an error message. If the event job execution fails,
+ * the work handler logs an error message and pauses the event job.
+ *
+ * <p>The work handler also handles system events by scheduling batch scheduler tasks.
+ */
+@Slf4j
+public class TimerTaskDisruptor implements Closeable {
+
+    private final Disruptor<TimerTaskEvent> disruptor;
+    private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
+
+    /**
+     * The default timeout for {@link #close()} in seconds.
+     */
+    private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5;
+
+    /**
+     * The default number of consumers to create for each {@link Disruptor} instance.
+     */
+    private static final int DEFAULT_CONSUMER_COUNT = System.getProperty("event.task.disruptor.consumer.count")
+            == null ? Runtime.getRuntime().availableProcessors()
+            : Integer.parseInt(System.getProperty("event.task.disruptor.consumer.count"));
+
+    /**
+     * Whether this disruptor has been closed.
+     * if true, then we can't publish any more events.
+     */
+    private boolean isClosed = false;
+
+    /**
+     * The default {@link EventTranslatorTwoArg} to use for {@link #tryPublish(Long, Long)}.
+     * This is used to avoid creating a new object for each publish.
+     */
+    private static final EventTranslatorTwoArg<TimerTaskEvent, Long, Long> TRANSLATOR
+            = (event, sequence, jobId, taskId) -> {
+                event.setJobId(jobId);
+                event.setTaskId(taskId);
+            };
+
+    public TimerTaskDisruptor(AsyncJobManager asyncJobManager) {
+        ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
+        disruptor = new Disruptor<>(TimerTaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
+                ProducerType.SINGLE, new BlockingWaitStrategy());
+        WorkHandler<TimerTaskEvent>[] workers = new TimerTaskExpirationHandler[DEFAULT_CONSUMER_COUNT];
+        for (int i = 0; i < DEFAULT_CONSUMER_COUNT; i++) {
+            workers[i] = new TimerTaskExpirationHandler(asyncJobManager);
+        }
+        disruptor.handleEventsWithWorkerPool(workers);
+        disruptor.start();
+    }
+
+    /**
+     * Publishes an event to the disruptor.
+     *
+     * @param eventId event job id
+     * @param taskId  event task id
+     */
+    public void tryPublish(Long eventId, Long taskId) {
+        if (isClosed) {
+            log.info("tryPublish failed, disruptor is closed, eventId: {}", eventId);
+            return;
+        }
+        try {
+            disruptor.publishEvent(TRANSLATOR, eventId, taskId);
+        } catch (Exception e) {
+            log.error("tryPublish failed, eventId: {}", eventId, e);
+        }
+    }
+
+    public boolean tryPublish(TimerTaskEvent timerTaskEvent) {
+        if (isClosed) {
+            log.info("tryPublish failed, disruptor is closed, eventJobId: {}", timerTaskEvent.getJobId());
+            return false;
+        }
+        try {
+            disruptor.publishEvent(TRANSLATOR, timerTaskEvent.getJobId(), timerTaskEvent.getTaskId());
+            return true;
+        } catch (Exception e) {
+            log.error("tryPublish failed, eventJobId: {}", timerTaskEvent.getJobId(), e);
+            return false;
+        }
+    }
+
+
+    @Override
+    public void close() {
+        try {
+            isClosed = true;
+            // we can wait for 5 seconds, so that backlog can be committed
+            disruptor.shutdown(DEFAULT_CLOSE_WAIT_TIME_SECONDS, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            log.warn("close disruptor failed", e);
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java
new file mode 100644
index 0000000000..3c1cfe440d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import com.lmax.disruptor.EventFactory;
+import lombok.Data;
+
+/**
+ * This class represents an event task that can be produced and consumed by the Disruptor.
+ * The event task contains the ID of the event job and the ID of the event task itself.
+ * The class also provides an event factory to create instances of {@link TimerTaskEvent}.
+ * <p>
+ * it's used by {@link TimerTaskDisruptor} and {@link TimerTaskExpirationHandler}
+ */
+@Data
+public class TimerTaskEvent {
+
+    private Long jobId;
+
+    private Long taskId;
+
+    public static final EventFactory<TimerTaskEvent> FACTORY = TimerTaskEvent::new;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java
new file mode 100644
index 0000000000..8c4a5db681
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import org.apache.doris.scheduler.constants.SystemJob;
+import org.apache.doris.scheduler.job.AsyncJobManager;
+import org.apache.doris.scheduler.job.Job;
+
+import com.lmax.disruptor.WorkHandler;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Objects;
+
+/**
+ * This class represents a work handler for processing event tasks consumed by a Disruptor.
+ * The work handler retrieves the associated event job and executes it if it is running.
+ * If the event job is not running, the work handler logs an error message.
+ * If the event job execution fails, the work handler logs an error message and pauses the event job.
+ * The work handler also handles system events by scheduling batch scheduler tasks.
+ */
+@Slf4j
+public class TimerTaskExpirationHandler implements WorkHandler<TimerTaskEvent> {
+
+    /**
+     * The event job manager used to retrieve and execute event jobs.
+     */
+    private AsyncJobManager asyncJobManager;
+
+    /**
+     * Constructs a new {@link TimerTaskExpirationHandler} instance with the specified event job manager.
+     *
+     * @param asyncJobManager The event job manager used to retrieve and execute event jobs.
+     */
+    public TimerTaskExpirationHandler(AsyncJobManager asyncJobManager) {
+        this.asyncJobManager = asyncJobManager;
+    }
+
+    /**
+     * Processes an event task by retrieving the associated event job and executing it if it is running.
+     * If the event job is not running, it logs an error message.
+     * If the event job execution fails, it logs an error message and pauses the event job.
+     *
+     * @param event The event task to be processed.
+     */
+    @Override
+    public void onEvent(TimerTaskEvent event) {
+        if (checkIsSystemEvent(event)) {
+            onSystemEvent();
+            return;
+        }
+        onEventTask(event);
+    }
+
+    /**
+     * Processes an event task by retrieving the associated event job and executing it if it is running.
+     *
+     * @param timerTaskEvent The event task to be processed.
+     */
+    @SuppressWarnings("checkstyle:UnusedLocalVariable")
+    public void onEventTask(TimerTaskEvent timerTaskEvent) {
+        long jobId = timerTaskEvent.getJobId();
+        Job job = asyncJobManager.getJob(jobId);
+        if (job == null) {
+            log.info("Event job is null, eventJobId: {}", jobId);
+            return;
+        }
+        if (!job.isRunning()) {
+            log.info("Event job is not running, eventJobId: {}", jobId);
+            return;
+        }
+        log.debug("Event job is running, eventJobId: {}", jobId);
+        checkJobIsExpired(job);
+        try {
+            // TODO: We should record the result of the event task.
+            //Object result = job.getExecutor().execute();
+            job.getExecutor().execute();
+            job.setLatestCompleteExecuteTimestamp(System.currentTimeMillis());
+        } catch (Exception e) {
+            log.error("Event job execute failed, jobId: {}", jobId, e);
+            job.pause(e.getMessage());
+        }
+    }
+
+    /**
+     * Handles a system event by scheduling batch scheduler tasks.
+     */
+    private void onSystemEvent() {
+        try {
+            asyncJobManager.batchSchedulerTasks();
+        } catch (Exception e) {
+            log.error("System batch scheduler execute failed", e);
+        }
+    }
+
+    /**
+     * Checks whether the specified event task is a system event.
+     *
+     * @param event The event task to be checked.
+     * @return true if the event task is a system event, false otherwise.
+     */
+    private boolean checkIsSystemEvent(TimerTaskEvent event) {
+        return Objects.equals(event.getJobId(), SystemJob.SYSTEM_SCHEDULER_JOB.getId());
+    }
+
+    private void checkJobIsExpired(Job job) {
+        if (job.isExpired()) {
+            job.pause();
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
new file mode 100644
index 0000000000..cd96b6a6e4
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.executor;
+
+/**
+ * This interface represents a callback for an event registration. All event registrations
+ * must implement this interface to provide an execution method.
+ *
+ * @param <T> The result type of the event job execution.
+ */
+@FunctionalInterface
+public interface JobExecutor<T> {
+
+    /**
+     * Executes the event job and returns the result.
+     * Exceptions will be caught internally, so there is no need to define or throw them separately.
+     *
+     * @return The result of the event job execution.
+     */
+    T execute();
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java
new file mode 100644
index 0000000000..e0944bf24a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.job;
+
+import org.apache.doris.scheduler.disruptor.TimerTaskDisruptor;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class AsyncJobManager implements Closeable {
+
+    private final ConcurrentHashMap<Long, Job> jobMap = new ConcurrentHashMap<>(128);
+
+    private long lastBatchSchedulerTimestamp;
+
+    /**
+     * batch scheduler interval time
+     */
+    private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60 * 1000L;
+
+    private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
+
+
+    private boolean isClosed = false;
+
+    /**
+     * key: jobid
+     * value: timeout list  for one job
+     * it's used to cancel task, if task has started, it can't be canceled
+     */
+    private final ConcurrentHashMap<Long, Map<Long, Timeout>> jobTimeoutMap =
+            new ConcurrentHashMap<>(128);
+
+    /**
+     * scheduler tasks, it's used to scheduler job
+     */
+    private final HashedWheelTimer dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS,
+            660);
+
+    /**
+     * Producer and Consumer model
+     * disruptor is used to handle task
+     * disruptor will start a thread pool to handle task
+     */
+    private final TimerTaskDisruptor disruptor;
+
+    public AsyncJobManager() {
+        dorisTimer.start();
+        this.disruptor = new TimerTaskDisruptor(this);
+        this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
+        batchSchedulerTasks();
+        cycleSystemSchedulerTasks();
+    }
+
+    public Long registerJob(Job job) {
+        if (!job.checkJobParam()) {
+            log.warn("registerJob failed, job: {} param is invalid", job);
+            return null;
+        }
+        if (job.getStartTimestamp() != 0L) {
+            job.setNextExecuteTimestamp(job.getStartTimestamp() + job.getIntervalMilliSeconds());
+        } else {
+            job.setNextExecuteTimestamp(System.currentTimeMillis() + job.getIntervalMilliSeconds());
+        }
+
+        if (job.getNextExecuteTimestamp() < BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) {
+            List<Long> executeTimestamp = findTasksBetweenTime(job, System.currentTimeMillis(),
+                    BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp,
+                    job.getNextExecuteTimestamp());
+            if (!executeTimestamp.isEmpty()) {
+                for (Long timestamp : executeTimestamp) {
+                    putOneTask(job.getJobId(), timestamp);
+                }
+            }
+        }
+
+        jobMap.putIfAbsent(job.getJobId(), job);
+        return job.getJobId();
+    }
+
+    public void unregisterJob(Long jobId) {
+        jobMap.remove(jobId);
+    }
+
+    public boolean pauseJob(Long jobId) {
+        if (jobMap.get(jobId) == null) {
+            log.warn("pauseJob failed, jobId: {} not exist", jobId);
+            return false;
+        }
+        cancelJobAllTask(jobId);
+        jobMap.get(jobId).pause();
+        return true;
+    }
+
+    public boolean resumeJob(Long jobId) {
+        if (jobMap.get(jobId) == null) {
+            log.warn("resumeJob failed, jobId: {} not exist", jobId);
+            return false;
+        }
+        jobMap.get(jobId).resume();
+        return true;
+    }
+
+    public boolean stopJob(Long jobId) {
+        if (jobMap.get(jobId) == null) {
+            log.warn("stopJob failed, jobId: {} not exist", jobId);
+            return false;
+        }
+        cancelJobAllTask(jobId);
+        jobMap.get(jobId).stop();
+        return true;
+    }
+
+    public Job getJob(Long jobId) {
+        return jobMap.get(jobId);
+    }
+
+    public Map<Long, Job> getAllJob() {
+        return jobMap;
+    }
+
+    public boolean batchSchedulerTasks() {
+        executeJobIdsWithinLastTenMinutesWindow();
+        return true;
+    }
+
+    public List<Long> findTasksBetweenTime(Job job, Long startTime, Long endTime, Long nextExecuteTime) {
+        List<Long> jobExecuteTimes = new ArrayList<>();
+        if (System.currentTimeMillis() < startTime) {
+            return jobExecuteTimes;
+        }
+        while (endTime >= nextExecuteTime) {
+            if (job.isTaskTimeExceeded()) {
+                break;
+            }
+            jobExecuteTimes.add(nextExecuteTime);
+            nextExecuteTime = job.getExecuteTimestampAndGeneratorNext();
+        }
+        return jobExecuteTimes;
+    }
+
+    /**
+     * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger
+     */
+    private void executeJobIdsWithinLastTenMinutesWindow() {
+        if (jobMap.isEmpty()) {
+            return;
+        }
+        jobMap.forEach((k, v) -> {
+            if (v.isRunning() && (v.getNextExecuteTimestamp() + v.getIntervalMilliSeconds()
+                    < lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) {
+                List<Long> executeTimes = findTasksBetweenTime(v, lastBatchSchedulerTimestamp,
+                        lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS,
+                        v.getNextExecuteTimestamp());
+                if (!executeTimes.isEmpty()) {
+                    for (Long executeTime : executeTimes) {
+                        putOneTask(v.getJobId(), executeTime);
+                    }
+                }
+            }
+        });
+        this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
+    }
+
+    /**
+     * We will cycle system scheduler tasks every 10 minutes.
+     * Jobs will be re-registered after the task is completed
+     */
+    private void cycleSystemSchedulerTasks() {
+        dorisTimer.newTimeout(timeout -> {
+            batchSchedulerTasks();
+            cycleSystemSchedulerTasks();
+        }, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS);
+    }
+
+    public void putOneTask(Long jobId, Long startExecuteTime) {
+        DorisTimerTask task = new DorisTimerTask(jobId, startExecuteTime, disruptor);
+        if (isClosed) {
+            log.info("putOneTask failed, scheduler is closed, jobId: {}", task.getJobId());
+            return;
+        }
+        long delay = getDelaySecond(task.getStartTimestamp());
+        Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
+        if (timeout == null) {
+            log.error("putOneTask failed, jobId: {}", task.getJobId());
+            return;
+        }
+        if (jobTimeoutMap.containsKey(task.getJobId())) {
+            jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
+            return;
+        }
+        Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
+        timeoutMap.put(task.getTaskId(), timeout);
+        jobTimeoutMap.put(task.getJobId(), timeoutMap);
+    }
+
+    // cancel all task for one job
+    // if task has started, it can't be canceled
+    public void cancelJobAllTask(Long jobId) {
+        if (!jobTimeoutMap.containsKey(jobId)) {
+            return;
+        }
+
+        jobTimeoutMap.get(jobId).values().forEach(timeout -> {
+            if (!timeout.isExpired() || timeout.isCancelled()) {
+                timeout.cancel();
+            }
+        });
+    }
+
+    public void stopTask(Long jobId, Long taskId) {
+        if (!jobTimeoutMap.containsKey(jobId)) {
+            return;
+        }
+        cancelJobAllTask(jobId);
+        jobTimeoutMap.get(jobId).remove(taskId);
+    }
+
+    // get delay time, if startTimestamp is less than now, return 0
+    private long getDelaySecond(long startTimestamp) {
+        long delay = 0;
+        long now = System.currentTimeMillis();
+        if (startTimestamp > now) {
+            delay = startTimestamp - now;
+        } else {
+            log.warn("startTimestamp is less than now, startTimestamp: {}, now: {}", startTimestamp, now);
+        }
+        return delay / 1000;
+    }
+
+    @Override
+    public void close() throws IOException {
+        isClosed = true;
+        dorisTimer.stop();
+        disruptor.close();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java
new file mode 100644
index 0000000000..7522548ad6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.job;
+
+import org.apache.doris.scheduler.disruptor.TimerTaskDisruptor;
+
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import lombok.Getter;
+
+import java.util.UUID;
+
+/**
+ * This class represents a timer task that can be scheduled by a Netty timer.
+ * When the timer task is triggered, it produces an event task using the Disruptor.
+ * The event task contains the ID of the event and the ID of the task itself.
+ */
+@Getter
+public class DorisTimerTask implements TimerTask {
+
+    private final Long jobId;
+
+    // more fields should be added here and record in feature
+    private final Long taskId = UUID.randomUUID().getMostSignificantBits();
+
+    private final Long startTimestamp;
+
+    private final TimerTaskDisruptor timerTaskDisruptor;
+
+    public DorisTimerTask(Long jobId, Long startTimestamp, TimerTaskDisruptor timerTaskDisruptor) {
+        this.jobId = jobId;
+        this.startTimestamp = startTimestamp;
+        this.timerTaskDisruptor = timerTaskDisruptor;
+    }
+
+    @Override
+    public void run(Timeout timeout) {
+        if (timeout.isCancelled()) {
+            return;
+        }
+        timerTaskDisruptor.tryPublish(jobId, taskId);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
new file mode 100644
index 0000000000..6923e2277f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.job;
+
+import org.apache.doris.scheduler.constants.JobStatus;
+import org.apache.doris.scheduler.executor.JobExecutor;
+
+import lombok.Data;
+
+import java.util.UUID;
+
+/**
+ * Job is the core of the scheduler module, which is used to store the Job information of the job module.
+ * We can use the job to uniquely identify a Job.
+ * The jobName is used to identify the job, which is not unique.
+ * The jobStatus is used to identify the status of the Job, which is used to control the execution of the
+ * job.
+ */
+@Data
+public class Job {
+
+    public Job(String jobName, Long intervalMilliSeconds, Long startTimestamp, Long endTimestamp,
+               JobExecutor executor) {
+        this.jobName = jobName;
+        this.executor = executor;
+        this.intervalMilliSeconds = intervalMilliSeconds;
+        this.startTimestamp = null == startTimestamp ? 0L : startTimestamp;
+        this.endTimestamp = null == endTimestamp ? 0L : endTimestamp;
+    }
+
+    private Long jobId = UUID.randomUUID().getMostSignificantBits();
+
+    private String jobName;
+
+    /**
+     * The status of the job, which is used to control the execution of the job.
+     *
+     * @see JobStatus
+     */
+    private JobStatus jobStatus = JobStatus.RUNNING;
+
+    /**
+     * The executor of the job.
+     *
+     * @see JobExecutor
+     */
+    private JobExecutor executor;
+
+    private String user;
+
+    private String errMsg;
+
+    private Long intervalMilliSeconds;
+
+    private Long updateTime;
+
+    private Long nextExecuteTimestamp;
+    private Long startTimestamp = 0L;
+
+    private Long endTimestamp = 0L;
+
+    private Long firstExecuteTimestamp = 0L;
+
+    private Long latestStartExecuteTimestamp = 0L;
+    private Long latestCompleteExecuteTimestamp = 0L;
+
+    public boolean isRunning() {
+        return jobStatus == JobStatus.RUNNING;
+    }
+
+    public boolean isStopped() {
+        return jobStatus == JobStatus.STOPPED;
+    }
+
+    public boolean isExpired(long nextExecuteTimestamp) {
+        if (endTimestamp == 0L) {
+            return false;
+        }
+        return nextExecuteTimestamp > endTimestamp;
+    }
+
+    public boolean isTaskTimeExceeded() {
+        if (endTimestamp == 0L) {
+            return false;
+        }
+        return System.currentTimeMillis() >= endTimestamp || nextExecuteTimestamp > endTimestamp;
+    }
+
+    public boolean isExpired() {
+        if (endTimestamp == 0L) {
+            return false;
+        }
+        return System.currentTimeMillis() >= endTimestamp;
+    }
+
+    public Long getExecuteTimestampAndGeneratorNext() {
+        this.latestStartExecuteTimestamp = nextExecuteTimestamp;
+        // todo The problem of delay should be considered. If it is greater than the ten-minute time window,
+        //  should the task be lost or executed on a new time window?
+        this.nextExecuteTimestamp = latestStartExecuteTimestamp + intervalMilliSeconds;
+        return nextExecuteTimestamp;
+    }
+
+    public void pause() {
+        this.jobStatus = JobStatus.PAUSED;
+    }
+
+    public void pause(String errMsg) {
+        this.jobStatus = JobStatus.PAUSED;
+        this.errMsg = errMsg;
+    }
+
+    public void resume() {
+        this.jobStatus = JobStatus.RUNNING;
+    }
+
+    public void stop() {
+        this.jobStatus = JobStatus.STOPPED;
+    }
+
+    public boolean checkJobParam() {
+        if (startTimestamp != 0L && startTimestamp < System.currentTimeMillis()) {
+            return false;
+        }
+        if (endTimestamp != 0L && endTimestamp < System.currentTimeMillis()) {
+            return false;
+        }
+        if (intervalMilliSeconds == null || intervalMilliSeconds <= 0L) {
+            return false;
+        }
+        return null != executor;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/JobRegister.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/JobRegister.java
new file mode 100644
index 0000000000..ebb6b0d590
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/JobRegister.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.registry;
+
+import org.apache.doris.scheduler.executor.JobExecutor;
+
+import java.io.IOException;
+
+/**
+ * This interface provides a contract for registering timed scheduling events.
+ * The implementation should trigger events in a timely manner using a specific algorithm.
+ * The execution of the events may be asynchronous and not guarantee strict timing accuracy.
+ */
+public interface JobRegister {
+
+    /**
+     * Register a job
+     *
+     * @param name        job name,it's not unique
+     * @param intervalMs  job interval, unit: ms
+     * @param executor    job executor @See {@link JobExecutor}
+     * @return event job id
+     */
+    Long registerJob(String name, Long intervalMs, JobExecutor executor);
+
+    /**
+     * Register a job
+     *
+     * @param name           job name,it's not unique
+     * @param intervalMs     job interval, unit: ms
+     * @param startTimeStamp job start time stamp, unit: ms
+     *                       if startTimeStamp is null, event job will start immediately in the next cycle
+     *                       startTimeStamp should be greater than current time
+     * @param executor       event job executor @See {@link JobExecutor}
+     * @return job id
+     */
+    Long registerJob(String name, Long intervalMs, Long startTimeStamp, JobExecutor executor);
+
+
+    /**
+     * Register a event job
+     *
+     * @param name           job name,it's not unique
+     * @param intervalMs     job interval, unit: ms
+     * @param startTimeStamp job start time stamp, unit: ms
+     *                       if startTimeStamp is null, job will start immediately in the next cycle
+     *                       startTimeStamp should be greater than current time
+     * @param endTimeStamp   job end time stamp, unit: ms
+     *                       if endTimeStamp is null, job will never stop
+     *                       endTimeStamp must be greater than startTimeStamp and endTimeStamp should be greater
+     *                       than current time
+     * @param executor       event job executor @See {@link JobExecutor}
+     * @return event job id
+     */
+    Long registerJob(String name, Long intervalMs, Long startTimeStamp, Long endTimeStamp,
+                          JobExecutor executor);
+
+    /**
+     * if job is running, pause it
+     * pause means event job will not be executed in the next cycle,but current cycle will not be interrupted
+     * we can resume it by {@link #resumeJob(Long)}
+     *
+     * @param eventId event job id
+     *                if eventId not exist, return false
+     * @return true if pause success, false if pause failed
+     */
+    Boolean pauseJob(Long jodId);
+
+    /**
+     * if job is running, stop it
+     * stop means event job will not be executed in the next cycle and current cycle will be interrupted
+     * stop not can be resumed, if you want to resume it, you should register it again
+     * we will delete stopped event job
+     *
+     * @param jobId event job id
+     * @return true if stop success, false if stop failed
+     */
+    Boolean stopJob(Long jobId);
+
+    /**
+     * if job is paused, resume it
+     *
+     * @param jobId job id
+     * @return true if resume success, false if resume failed
+     */
+    Boolean resumeJob(Long jobId);
+
+    /**
+     * close job scheduler register
+     * close means job scheduler register will not accept new job
+     * Jobs that have not reached the trigger time will not be executed. Jobs that have reached the trigger time will
+     * have an execution time of 5 seconds, and will not be executed if the time exceeds
+     */
+    void close() throws IOException;
+
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java
new file mode 100644
index 0000000000..dceb8049cd
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import org.apache.doris.scheduler.executor.JobExecutor;
+import org.apache.doris.scheduler.job.AsyncJobManager;
+import org.apache.doris.scheduler.job.Job;
+
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class AsyncJobManagerTest {
+
+    AsyncJobManager asyncJobManager;
+
+    private static AtomicInteger testExecuteCount = new AtomicInteger(0);
+    Job job = new Job("test", 6000L, null,
+            null, new TestExecutor());
+
+    @BeforeEach
+    public void init() {
+        testExecuteCount.set(0);
+        asyncJobManager = new AsyncJobManager();
+    }
+
+    @Test
+    public void testCycleScheduler() {
+        asyncJobManager.registerJob(job);
+        //consider the time of the first execution and give some buffer time
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> testExecuteCount.get() >= 3);
+    }
+
+    @Test
+    public void testCycleSchedulerAndStop() {
+        asyncJobManager.registerJob(job);
+        long startTime = System.currentTimeMillis();
+        Awaitility.await().atMost(8, TimeUnit.SECONDS).until(() -> testExecuteCount.get() >= 1);
+        asyncJobManager.unregisterJob(job.getJobId());
+        //consider the time of the first execution and give some buffer time
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> System.currentTimeMillis() >= startTime + 13000L);
+        Assertions.assertEquals(1, testExecuteCount.get());
+
+    }
+
+    @Test
+    public void testCycleSchedulerWithIncludeStartTimeAndEndTime() {
+        job.setStartTimestamp(System.currentTimeMillis() + 6000L);
+        long endTimestamp = System.currentTimeMillis() + 19000L;
+        job.setEndTimestamp(endTimestamp);
+        asyncJobManager.registerJob(job);
+        //consider the time of the first execution and give some buffer time
+
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> System.currentTimeMillis()
+                >= endTimestamp + 12000L);
+        Assertions.assertEquals(2, testExecuteCount.get());
+    }
+
+    @Test
+    public void testCycleSchedulerWithIncludeEndTime() {
+        long endTimestamp = System.currentTimeMillis() + 13000;
+        job.setEndTimestamp(endTimestamp);
+        asyncJobManager.registerJob(job);
+        //consider the time of the first execution and give some buffer time
+        Awaitility.await().atMost(36, TimeUnit.SECONDS).until(() -> System.currentTimeMillis()
+                >= endTimestamp + 12000L);
+        Assertions.assertEquals(2, testExecuteCount.get());
+    }
+
+    @Test
+    public void testCycleSchedulerWithIncludeStartTime() {
+
+        long startTimestamp = System.currentTimeMillis() + 6000L;
+        job.setStartTimestamp(startTimestamp);
+        asyncJobManager.registerJob(job);
+        //consider the time of the first execution and give some buffer time
+        Awaitility.await().atMost(14, TimeUnit.SECONDS).until(() -> System.currentTimeMillis()
+                >= startTimestamp + 7000L);
+        Assertions.assertEquals(1, testExecuteCount.get());
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        asyncJobManager.close();
+    }
+
+    class TestExecutor implements JobExecutor<Boolean> {
+        @Override
+        public Boolean execute() {
+            log.info("test execute count:{}", testExecuteCount.incrementAndGet());
+            return true;
+        }
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java
new file mode 100644
index 0000000000..1630b1f864
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import org.apache.doris.scheduler.executor.JobExecutor;
+import org.apache.doris.scheduler.job.AsyncJobManager;
+import org.apache.doris.scheduler.job.Job;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Tested;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public class TimerTaskDisruptorTest {
+
+    @Tested
+    private TimerTaskDisruptor timerTaskDisruptor;
+
+    @Injectable
+    private AsyncJobManager asyncJobManager;
+
+    private static boolean testEventExecuteFlag = false;
+
+    @BeforeEach
+    public void init() {
+        timerTaskDisruptor = new TimerTaskDisruptor(asyncJobManager);
+    }
+
+    @Test
+    void testPublishEventAndConsumer() {
+        Job job = new Job("test", 6000L, null,
+                null, new TestExecutor());
+        new Expectations() {{
+                asyncJobManager.getJob(anyLong);
+                result = job;
+            }};
+        timerTaskDisruptor.tryPublish(job.getJobId(), UUID.randomUUID().getMostSignificantBits());
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> testEventExecuteFlag);
+        Assertions.assertTrue(testEventExecuteFlag);
+    }
+
+
+    class TestExecutor implements JobExecutor<Boolean> {
+        @Override
+        public Boolean execute() {
+            testEventExecuteFlag = true;
+            return true;
+        }
+    }
+
+    @AfterEach
+    public void after() {
+        timerTaskDisruptor.close();
+    }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index 12fe1b7231..048257fdb5 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -310,7 +310,7 @@ under the License.
         <vesoft.client.version>3.0.0</vesoft.client.version>
         <!-- paimon -->
         <paimon.version>0.4.0-incubating</paimon.version>
-        <disruptor.version>3.3.4</disruptor.version>
+        <disruptor.version>3.4.4</disruptor.version>
     </properties>
     <profiles>
         <profile>
@@ -1432,6 +1432,12 @@ under the License.
             <groupId>org.jmockit</groupId>
             <artifactId>jmockit</artifactId>
         </dependency>
+        <!-- should be used in test scope -->
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
+
     </dependencies>
     <reporting>
         <plugins>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org