You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2023/05/17 13:31:35 UTC

[incubator-streampark] branch schedule created (now 87214c403)

This is an automated email from the ASF dual-hosted git repository.

monster pushed a change to branch schedule
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


      at 87214c403 [WIP][Feature]Support offline scheduling

This branch includes the following new commits:

     new 87214c403 [WIP][Feature]Support offline scheduling

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-streampark] 01/01: [WIP][Feature]Support offline scheduling

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch schedule
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 87214c40356380ee15db3cb2cc2e5bde1c105b30
Author: monster <60...@users.noreply.github.com>
AuthorDate: Wed May 17 21:31:23 2023 +0800

    [WIP][Feature]Support offline scheduling
---
 .../apache/streampark/common/util/DateUtils.scala  |  34 ++++-
 .../streampark/console/core/entity/Schedule.java   |  71 +++++++++
 .../console/core/enums/ScheduleState.java          |  46 ++++++
 .../console/core/mapper/SchedulerMapper.java       |  24 ++++
 .../console/core/quartz/JobScheduleTask.java       |  83 +++++++++++
 .../console/core/quartz/QuartzExecutors.java       | 159 +++++++++++++++++++++
 .../console/core/quartz/QuartzTaskUtils.java       |  53 +++++++
 .../console/core/quartz/SchedulerApi.java          |  31 ++++
 .../console/core/service/SchedulerService.java     |  44 ++++++
 .../core/service/impl/SchedulerServiceImpl.java    |  71 +++++++++
 .../src/main/resources/application.yml             |  21 +++
 11 files changed, 636 insertions(+), 1 deletion(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
index 8c62eebe1..fe91ee2b4 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
@@ -16,8 +16,10 @@
  */
 package org.apache.streampark.common.util
 
+import org.apache.commons.lang3.StringUtils
+
 import java.text.{ParseException, SimpleDateFormat}
-import java.time.{Duration, LocalDateTime}
+import java.time.{Duration, LocalDateTime, ZonedDateTime, ZoneId}
 import java.time.format.DateTimeFormatter
 import java.util._
 import java.util.concurrent.TimeUnit
@@ -220,4 +222,34 @@ object DateUtils {
     DateUtils.getDateFormat(d, format)
   }
 
+  def transformTimezoneDate(
+      date: Date,
+      sourceTimezoneId: String,
+      targetTimezoneId: String): Date = {
+    Option(sourceTimezoneId)
+      .filter(StringUtils.isNotEmpty)
+      .flatMap(_ => Option(targetTimezoneId))
+      .filter(StringUtils.isNotEmpty)
+      .map(
+        _ => {
+          val dateToString = dateToString(date, sourceTimezoneId)
+          val localDateTime =
+            LocalDateTime.parse(dateToString, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+          val zonedDateTime =
+            ZonedDateTime.of(localDateTime, TimeZone.getTimeZone(targetTimezoneId).toZoneId)
+          Date.from(zonedDateTime.toInstant)
+        })
+      .getOrElse(date)
+  }
+
+  def transformTimezoneDate(date: Date, targetTimezoneId: String): Date = {
+    transformTimezoneDate(date, ZoneId.systemDefault().getId, targetTimezoneId)
+  }
+
+  def getTimezone(timezoneId: String): Option[TimeZone] = {
+    Option(timezoneId)
+      .filter(StringUtils.isNotEmpty)
+      .map(TimeZone.getTimeZone)
+  }
+
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
new file mode 100644
index 000000000..903088f5b
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.console.core.enums.ScheduleState;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+
+import java.util.Date;
+
+@TableName("t_flink_schedules")
+@Data
+public class Schedule {
+
+  @TableId(value = "id", type = IdType.AUTO)
+  private int id;
+
+  private Long appId;
+
+  @TableField(exist = false)
+  private String description;
+
+  /** schedule start time */
+  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+  private Date startTime;
+
+  /** schedule end time */
+  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+  private Date endTime;
+
+  /**
+   * timezoneId
+   *
+   * <p>see {@link java.util.TimeZone#getTimeZone(String)}
+   */
+  private String timezoneId;
+
+  /** crontab expression */
+  private String crontab;
+
+  /** create time */
+  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+  private Date createTime;
+
+  /** update time */
+  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+  private Date updateTime;
+
+  /** release state */
+  private ScheduleState scheduleState;
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ScheduleState.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ScheduleState.java
new file mode 100644
index 000000000..4ff720673
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ScheduleState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.enums;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+import lombok.Getter;
+
+public enum ScheduleState {
+
+  /** 0 offline 1 online */
+  OFFLINE(0, "offline"),
+  ONLINE(1, "online");
+
+  ScheduleState(int code, String descp) {
+    this.code = code;
+    this.descp = descp;
+  }
+
+  @Getter @EnumValue private final int code;
+  @Getter private final String descp;
+
+  public static ReleaseState getEnum(int value) {
+    for (ReleaseState e : ReleaseState.values()) {
+      if (e.ordinal() == value) {
+        return e;
+      }
+    }
+    // For values out of enum scope
+    return null;
+  }
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
new file mode 100644
index 000000000..ac672e345
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.mapper;
+
+import org.apache.streampark.console.core.entity.Schedule;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface SchedulerMapper extends BaseMapper<Schedule> {}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
new file mode 100644
index 000000000..928b058ae
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.quartz;
+
+import org.apache.streampark.console.core.entity.Schedule;
+import org.apache.streampark.console.core.enums.ScheduleState;
+import org.apache.streampark.console.core.service.SchedulerService;
+
+import lombok.extern.slf4j.Slf4j;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.springframework.scheduling.quartz.QuartzJobBean;
+
+import java.util.Date;
+
+@Slf4j
+public class JobScheduleTask extends QuartzJobBean {
+
+  private final SchedulerService schedulerService;
+
+  public JobScheduleTask(SchedulerService schedulerService) {
+    this.schedulerService = schedulerService;
+  }
+
+  @Override
+  protected void executeInternal(JobExecutionContext context) {
+    JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+
+    int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID);
+
+    Date scheduledFireTime = context.getScheduledFireTime();
+
+    Date fireTime = context.getFireTime();
+
+    log.info(
+        "Scheduled fire time :{}, fire time :{}, process id :{}.",
+        scheduledFireTime,
+        fireTime,
+        scheduleId);
+
+    // query schedule
+    Schedule schedule = schedulerService.querySchedule(scheduleId);
+    if (schedule == null || ScheduleState.OFFLINE == schedule.getScheduleState()) {
+      log.warn(
+          "Job schedule does not exist in db or process schedule offline,delete schedule job in quartz, scheduleId:{}.",
+          scheduleId);
+      deleteJob(context, scheduleId);
+      return;
+    }
+    // start flink job
+    // .......
+  }
+
+  private void deleteJob(JobExecutionContext context, int scheduleId) {
+    final Scheduler scheduler = context.getScheduler();
+    JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId);
+    try {
+      if (scheduler.checkExists(jobKey)) {
+        log.info("Try to delete job: {}.", scheduleId);
+        scheduler.deleteJob(jobKey);
+      }
+    } catch (Exception e) {
+      log.error("Failed to delete job: {}.", jobKey);
+    }
+  }
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
new file mode 100644
index 000000000..2ba57af09
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.quartz;
+
+import org.apache.streampark.common.util.DateUtils;
+import org.apache.streampark.console.core.entity.Schedule;
+
+import com.google.common.base.Strings;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.quartz.CronTrigger;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+@Slf4j
+public class QuartzExecutors implements SchedulerApi {
+
+  private final Scheduler scheduler;
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  public QuartzExecutors(Scheduler scheduler) {
+    this.scheduler = scheduler;
+  }
+
+  @SneakyThrows
+  @Override
+  public void start() {
+    try {
+      scheduler.start();
+    } catch (Exception e) {
+      throw new SchedulerException("Failed to start quartz scheduler.", e);
+    }
+  }
+
+  @SneakyThrows
+  @Override
+  public void insertOrUpdateScheduleTask(Schedule schedule) {
+    JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId());
+    Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(schedule);
+    String cronExpression = schedule.getCrontab();
+    String timezoneId = schedule.getTimezoneId();
+    Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
+    Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);
+
+    lock.writeLock().lock();
+    try {
+
+      JobDetail jobDetail;
+      if (scheduler.checkExists(jobKey)) {
+
+        jobDetail = scheduler.getJobDetail(jobKey);
+        jobDetail.getJobDataMap().putAll(jobDataMap);
+      } else {
+        jobDetail = newJob(JobScheduleTask.class).withIdentity(jobKey).build();
+
+        jobDetail.getJobDataMap().putAll(jobDataMap);
+
+        scheduler.addJob(jobDetail, false, true);
+
+        log.info("Add job, job name: {}, group name: {}.", jobKey.getName(), jobKey.getGroup());
+      }
+
+      TriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup());
+
+      CronTrigger cronTrigger =
+          newTrigger()
+              .withIdentity(triggerKey)
+              .startAt(startDate)
+              .endAt(endDate)
+              .withSchedule(
+                  cronSchedule(cronExpression)
+                      .withMisfireHandlingInstructionDoNothing()
+                      .inTimeZone(DateUtils.getTimezone(timezoneId).get()))
+              .forJob(jobDetail)
+              .build();
+
+      if (scheduler.checkExists(triggerKey)) {
+        CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
+        String oldCronExpression = oldCronTrigger.getCronExpression();
+
+        if (!Strings.nullToEmpty(cronExpression)
+            .equalsIgnoreCase(Strings.nullToEmpty(oldCronExpression))) {
+          scheduler.rescheduleJob(triggerKey, cronTrigger);
+          log.info(
+              "Reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
+              triggerKey.getName(),
+              triggerKey.getGroup(),
+              cronExpression,
+              startDate,
+              endDate);
+        }
+      } else {
+        scheduler.scheduleJob(cronTrigger);
+        log.info(
+            "Schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
+            triggerKey.getName(),
+            triggerKey.getGroup(),
+            cronExpression,
+            startDate,
+            endDate);
+      }
+    } catch (Exception e) {
+      throw new SchedulerException("Add schedule job failed.", e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @SneakyThrows
+  @Override
+  public void deleteScheduleTask(int scheduleId) {
+    JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId);
+    try {
+      if (scheduler.checkExists(jobKey)) {
+        log.info("Try to delete scheduler task, schedulerId: {}.", scheduleId);
+        scheduler.deleteJob(jobKey);
+      }
+    } catch (Exception e) {
+      log.error("Failed to delete scheduler task, schedulerId: {}.", scheduleId, e);
+      throw new SchedulerException("Failed to delete scheduler task.");
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      scheduler.shutdown();
+    } catch (org.quartz.SchedulerException e) {
+      throw new SchedulerException("Failed to shutdown scheduler.", e);
+    }
+  }
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzTaskUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzTaskUtils.java
new file mode 100644
index 000000000..a92da37a2
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzTaskUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.quartz;
+
+import org.apache.streampark.common.util.JsonUtils;
+import org.apache.streampark.console.core.entity.Schedule;
+
+import org.quartz.JobKey;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class QuartzTaskUtils {
+
+  public static final String QUARTZ_JOB_PREFIX = "job";
+  public static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup";
+  public static final String UNDERLINE = "_";
+  public static final String SCHEDULE_ID = "scheduleId";
+  public static final String SCHEDULE = "schedule";
+
+  /**
+   * @param schedulerId scheduler id
+   * @return quartz job name
+   */
+  public static JobKey getJobKey(int schedulerId) {
+    String jobName = QUARTZ_JOB_PREFIX + UNDERLINE + schedulerId;
+    String jobGroup = QUARTZ_JOB_GROUP_PREFIX + UNDERLINE;
+    return new JobKey(jobName, jobGroup);
+  }
+
+  public static Map<String, Object> buildDataMap(Schedule schedule) {
+    Map<String, Object> dataMap = new HashMap<>(8);
+    dataMap.put(SCHEDULE_ID, schedule.getId());
+    dataMap.put(SCHEDULE, JsonUtils.Marshal(schedule).toJson());
+
+    return dataMap;
+  }
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/SchedulerApi.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/SchedulerApi.java
new file mode 100644
index 000000000..d03f0035b
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/SchedulerApi.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.quartz;
+
+import org.apache.streampark.console.core.entity.Schedule;
+
+public interface SchedulerApi extends AutoCloseable {
+
+  void start();
+
+  void insertOrUpdateScheduleTask(Schedule schedule);
+
+  void deleteScheduleTask(int scheduleId);
+
+  void close() throws Exception;
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
new file mode 100644
index 000000000..4e5f6583b
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.entity.Schedule;
+import org.apache.streampark.console.core.enums.ReleaseState;
+import org.apache.streampark.console.core.enums.ScheduleState;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.Map;
+
+public interface SchedulerService extends IService<Schedule> {
+
+  boolean insertSchedule(Long appId, String schedule);
+
+  boolean updateSchedule(Long appId, String scheduleExpression);
+
+  Map<String, Object> setScheduleState(Long appId, ReleaseState scheduleStatus);
+
+  boolean deleteSchedule(int scheduleId);
+
+  Map<String, Object> previewSchedule(String schedule);
+
+  IPage<Schedule> page(Schedule savePoint, ScheduleState request);
+
+  Schedule querySchedule(int id);
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
new file mode 100644
index 000000000..2714eb1eb
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.core.entity.Schedule;
+import org.apache.streampark.console.core.enums.ReleaseState;
+import org.apache.streampark.console.core.enums.ScheduleState;
+import org.apache.streampark.console.core.mapper.SchedulerMapper;
+import org.apache.streampark.console.core.service.SchedulerService;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+/** scheduler service impl */
+@Service
+public class SchedulerServiceImpl extends ServiceImpl<SchedulerMapper, Schedule>
+    implements SchedulerService {
+
+  @Override
+  public boolean insertSchedule(Long appId, String schedule) {
+    return false;
+  }
+
+  @Override
+  public boolean updateSchedule(Long appId, String scheduleExpression) {
+    return false;
+  }
+
+  @Override
+  public Map<String, Object> setScheduleState(Long appId, ReleaseState scheduleStatus) {
+    return null;
+  }
+
+  @Override
+  public boolean deleteSchedule(int scheduleId) {
+    return false;
+  }
+
+  @Override
+  public Map<String, Object> previewSchedule(String schedule) {
+    return null;
+  }
+
+  @Override
+  public IPage<Schedule> page(Schedule savePoint, ScheduleState request) {
+    return null;
+  }
+
+  @Override
+  public Schedule querySchedule(int id) {
+    return getById(id);
+  }
+}
diff --git a/streampark-console/streampark-console-service/src/main/resources/application.yml b/streampark-console/streampark-console-service/src/main/resources/application.yml
index 5b832f547..0bbca7653 100644
--- a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++ b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -66,6 +66,27 @@ spring:
   mvc:
     converters:
       preferred-json-mapper: jackson
+  quartz:
+    auto-startup: false
+    job-store-type: jdbc
+    jdbc:
+      initialize-schema: never
+    properties:
+      org.quartz.threadPool.threadPriority: 5
+      org.quartz.jobStore.isClustered: true
+      org.quartz.jobStore.class: org.springframework.scheduling.quartz.LocalDataSourceJobStore
+      org.quartz.scheduler.instanceId: AUTO
+      org.quartz.jobStore.tablePrefix: QRTZ_
+      org.quartz.jobStore.acquireTriggersWithinLock: true
+      org.quartz.scheduler.instanceName: StreamPark
+      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
+      org.quartz.jobStore.useProperties: false
+      org.quartz.threadPool.makeThreadsDaemons: true
+      org.quartz.threadPool.threadCount: 25
+      org.quartz.jobStore.misfireThreshold: 60000
+      org.quartz.scheduler.makeSchedulerThreadDaemon: true
+      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
+      org.quartz.jobStore.clusterCheckinInterval: 5000
 
 management:
   endpoints: