You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/12/26 03:09:37 UTC

[dolphinscheduler] branch dev updated: Add mybatis in mysql registry module (#13275)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 04785dc799 Add mybatis in mysql registry module (#13275)
04785dc799 is described below

commit 04785dc799200d052fa7367ca4a5ebbd9784f036
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Dec 26 11:09:25 2022 +0800

    Add mybatis in mysql registry module (#13275)
---
 .../dolphinscheduler/dao/DaoConfiguration.java     |   2 +-
 .../dolphinscheduler-registry-mysql/pom.xml        |   6 +
 .../plugin/registry/mysql/MysqlOperator.java       | 401 ++++-----------------
 .../plugin/registry/mysql/MysqlRegistry.java       |  13 +-
 .../registry/mysql/MysqlRegistryConfiguration.java |  61 ++++
 .../mysql/mapper/MysqlRegistryDataMapper.java      |  61 ++++
 .../mysql/mapper/MysqlRegistryLockMapper.java      |  48 +++
 .../registry/mysql/model/MysqlRegistryData.java    |  15 +-
 .../registry/mysql/model/MysqlRegistryLock.java    |  14 +-
 .../registry/mysql/task/EphemeralDateManager.java  |   2 +-
 10 files changed, 289 insertions(+), 334 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java
index 68fd2cae66..d1de5630a8 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java
@@ -26,6 +26,6 @@ import org.springframework.context.annotation.Configuration;
 
 @Configuration
 @EnableAutoConfiguration
-@MapperScan("org.apache.dolphinscheduler.dao.mapper")
+@MapperScan(basePackages = "org.apache.dolphinscheduler.dao.mapper", sqlSessionFactoryRef = "sqlSessionFactory")
 public class DaoConfiguration {
 }
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
index fd7f0925e2..dac8423d5e 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
@@ -50,6 +50,12 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.baomidou</groupId>
+            <artifactId>mybatis-plus</artifactId>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
index adfa41324a..a770fe75c7 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
@@ -17,412 +17,167 @@
 
 package org.apache.dolphinscheduler.plugin.registry.mysql;
 
+import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper;
+import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper;
 import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType;
 import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
 import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLIntegrityConstraintViolationException;
-import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Optional;
-
-import lombok.NonNull;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
 
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
-
-/**
- * Used to CRUD from mysql
- */
-public class MysqlOperator implements AutoCloseable {
+@Component
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
+public class MysqlOperator {
 
     private static final Logger logger = LoggerFactory.getLogger(MysqlOperator.class);
 
-    private final HikariDataSource dataSource;
+    @Autowired
+    private MysqlRegistryDataMapper mysqlRegistryDataMapper;
+    @Autowired
+    private MysqlRegistryLockMapper mysqlRegistryLockMapper;
+
     private final long expireTimeWindow;
 
     public MysqlOperator(MysqlRegistryProperties registryProperties) {
         this.expireTimeWindow =
                 registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis();
-
-        HikariConfig hikariConfig = registryProperties.getHikariConfig();
-        hikariConfig.setPoolName("MysqlRegistryDataSourcePool");
-
-        this.dataSource = new HikariDataSource(hikariConfig);
     }
 
-    public void healthCheck() throws SQLException {
-        String sql = "select 1 from t_ds_mysql_registry_data";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql);
-                ResultSet resultSet = preparedStatement.executeQuery();) {
-            // if no exception, the healthCheck success
-        }
+    public void healthCheck() {
+        mysqlRegistryLockMapper.countAll();
     }
 
-    public List<MysqlRegistryData> queryAllMysqlRegistryData() throws SQLException {
-        String sql =
-                "select id, `key`, data, type, last_term, create_time, last_update_time from t_ds_mysql_registry_data";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql);
-                ResultSet resultSet = preparedStatement.executeQuery()) {
-            List<MysqlRegistryData> result = new ArrayList<>(resultSet.getFetchSize());
-            while (resultSet.next()) {
-                MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
-                        .id(resultSet.getLong("id"))
-                        .key(resultSet.getString("key"))
-                        .data(resultSet.getString("data"))
-                        .type(resultSet.getInt("type"))
-                        .lastTerm(resultSet.getLong("last_term"))
-                        .createTime(resultSet.getTimestamp("create_time"))
-                        .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
-                        .build();
-                result.add(mysqlRegistryData);
-            }
-            return result;
-        }
+    public List<MysqlRegistryData> queryAllMysqlRegistryData() {
+        return mysqlRegistryDataMapper.selectAll();
     }
 
     public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
-        Optional<MysqlRegistryData> mysqlRegistryDataOptional = selectByKey(key);
-        if (mysqlRegistryDataOptional.isPresent()) {
-            long id = mysqlRegistryDataOptional.get().getId();
-            if (!updateValueById(id, value)) {
+        MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
+        if (mysqlRegistryData != null) {
+            long id = mysqlRegistryData.getId();
+            if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
                 throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
             }
             return id;
         }
-        MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
+        mysqlRegistryData = MysqlRegistryData.builder()
                 .key(key)
                 .data(value)
                 .type(DataType.EPHEMERAL.getTypeValue())
                 .lastTerm(System.currentTimeMillis())
                 .build();
-        return insertMysqlRegistryData(mysqlRegistryData);
-    }
-
-    private Optional<MysqlRegistryData> selectByKey(@NonNull String key) throws SQLException {
-        String sql =
-                "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data where `key` = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, key);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                if (resultSet.next()) {
-                    return Optional.of(
-                            MysqlRegistryData.builder()
-                                    .id(resultSet.getLong("id"))
-                                    .key(resultSet.getString("key"))
-                                    .data(resultSet.getString("data"))
-                                    .type(resultSet.getInt("type"))
-                                    .createTime(resultSet.getTimestamp("create_time"))
-                                    .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
-                                    .build());
-                }
-                return Optional.empty();
-            }
-        }
-    }
-
-    private boolean updateValueById(long id, String value) throws SQLException {
-        String sql = "update t_ds_mysql_registry_data set data = ?, last_term = ? where id = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, value);
-            preparedStatement.setLong(2, System.currentTimeMillis());
-            preparedStatement.setLong(3, id);
-            return preparedStatement.executeUpdate() > 0;
-        }
-    }
-
-    private long insertMysqlRegistryData(@NonNull MysqlRegistryData mysqlRegistryData) throws SQLException {
-        String sql =
-                "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement =
-                        connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
-            preparedStatement.setString(1, mysqlRegistryData.getKey());
-            preparedStatement.setString(2, mysqlRegistryData.getData());
-            preparedStatement.setInt(3, mysqlRegistryData.getType());
-            preparedStatement.setLong(4, mysqlRegistryData.getLastTerm());
-            int insertCount = preparedStatement.executeUpdate();
-            ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
-            if (insertCount < 1 || !generatedKeys.next()) {
-                throw new SQLException("Insert ephemeral data error, data: " + mysqlRegistryData);
-            }
-            return generatedKeys.getLong(1);
-        }
+        mysqlRegistryDataMapper.insert(mysqlRegistryData);
+        return mysqlRegistryData.getId();
     }
 
     public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
-        String sql =
-                "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)"
-                        +
-                        "ON DUPLICATE KEY UPDATE data=?, last_term=?";
-        // put a persistent Data
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement =
-                        connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
-            long term = System.currentTimeMillis();
-            preparedStatement.setString(1, key);
-            preparedStatement.setString(2, value);
-            preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
-            preparedStatement.setLong(4, term);
-            preparedStatement.setString(5, value);
-            preparedStatement.setLong(6, term);
-            int insertCount = preparedStatement.executeUpdate();
-            ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
-            if (insertCount < 1 || !generatedKeys.next()) {
-                throw new SQLException("Insert or update persistent data error");
+        MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
+        if (mysqlRegistryData != null) {
+            long id = mysqlRegistryData.getId();
+            if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
+                throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
             }
-            return generatedKeys.getLong(1);
-        }
-    }
-
-    public void deleteEphemeralData(String key) throws SQLException {
-        String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, key);
-            preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
-            preparedStatement.execute();
+            return id;
         }
+        mysqlRegistryData = MysqlRegistryData.builder()
+                .key(key)
+                .data(value)
+                .type(DataType.PERSISTENT.getTypeValue())
+                .lastTerm(System.currentTimeMillis())
+                .build();
+        mysqlRegistryDataMapper.insert(mysqlRegistryData);
+        return mysqlRegistryData.getId();
     }
 
-    public void deleteEphemeralData(long ephemeralNodeId) throws SQLException {
-        String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setLong(1, ephemeralNodeId);
-            preparedStatement.execute();
-        }
+    public void deleteDataByKey(String key) {
+        mysqlRegistryDataMapper.deleteByKey(key);
     }
 
-    public void deletePersistentData(String key) throws SQLException {
-        String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, key);
-            preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue());
-            preparedStatement.execute();
-        }
+    public void deleteDataById(long id) {
+        mysqlRegistryDataMapper.deleteById(id);
     }
 
     public void clearExpireLock() {
-        String sql = "delete from t_ds_mysql_registry_lock where last_term < ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
-            int i = preparedStatement.executeUpdate();
-            if (i > 0) {
-                logger.info("Clear expire lock, size: {}", i);
-            }
-        } catch (Exception ex) {
-            logger.warn("Clear expire lock from mysql registry error", ex);
-        }
+        mysqlRegistryLockMapper.clearExpireLock(System.currentTimeMillis() - expireTimeWindow);
     }
 
     public void clearExpireEphemeralDate() {
-        String sql = "delete from t_ds_mysql_registry_data where last_term < ? and type = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
-            preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
-            int i = preparedStatement.executeUpdate();
-            if (i > 0) {
-                logger.info("clear expire ephemeral data, size:{}", i);
-            }
-        } catch (Exception ex) {
-            logger.warn("Clear expire ephemeral data from mysql registry error", ex);
-        }
+        mysqlRegistryDataMapper.clearExpireEphemeralDate(System.currentTimeMillis() - expireTimeWindow,
+                DataType.EPHEMERAL.getTypeValue());
     }
 
     public MysqlRegistryData getData(String key) throws SQLException {
-        String sql =
-                "SELECT id, `key`, data, type, last_term, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, key);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                if (!resultSet.next()) {
-                    return null;
-                }
-                return MysqlRegistryData.builder()
-                        .id(resultSet.getLong("id"))
-                        .key(resultSet.getString("key"))
-                        .data(resultSet.getString("data"))
-                        .type(resultSet.getInt("type"))
-                        .lastTerm(resultSet.getLong("last_term"))
-                        .createTime(resultSet.getTimestamp("create_time"))
-                        .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
-                        .build();
-            }
-        }
+        return mysqlRegistryDataMapper.selectByKey(key);
     }
 
     public List<String> getChildren(String key) throws SQLException {
-        String sql = "SELECT `key` from t_ds_mysql_registry_data where `key` like ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, key + "%");
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                List<String> result = new ArrayList<>(resultSet.getFetchSize());
-                while (resultSet.next()) {
-                    String fullPath = resultSet.getString("key");
-                    if (fullPath.length() > key.length()) {
-                        result.add(StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"));
-                    }
-                }
-                return result;
-            }
-        }
+        return mysqlRegistryDataMapper.fuzzyQueryByKey(key)
+                .stream()
+                .map(MysqlRegistryData::getKey)
+                .filter(fullPath -> fullPath.length() > key.length())
+                .map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"))
+                .collect(Collectors.toList());
     }
 
     public boolean existKey(String key) throws SQLException {
-        String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, key);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                if (resultSet.next()) {
-                    return true;
-                }
-            }
-        }
-        return false;
+        MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
+        return mysqlRegistryData != null;
     }
 
     /**
      * Try to acquire the target Lock, if cannot acquire, return null.
      */
+    @SuppressWarnings("checkstyle:IllegalCatch")
     public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
-        String sql =
-                "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term) VALUES (?, ?, ?)";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement =
-                        connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
-            preparedStatement.setString(1, key);
-            preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER);
-            preparedStatement.setLong(3, System.currentTimeMillis());
-            preparedStatement.executeUpdate();
-            try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
-                if (resultSet.next()) {
-                    long newLockId = resultSet.getLong(1);
-                    return getLockById(newLockId);
-                }
+        MysqlRegistryLock mysqlRegistryLock = MysqlRegistryLock.builder()
+                .key(key)
+                .lockOwner(MysqlRegistryConstant.LOCK_OWNER)
+                .lastTerm(System.currentTimeMillis())
+                .build();
+        try {
+            mysqlRegistryLockMapper.insert(mysqlRegistryLock);
+            return mysqlRegistryLock;
+        } catch (Exception e) {
+            if (e instanceof SQLIntegrityConstraintViolationException) {
+                return null;
             }
-            return null;
-        } catch (SQLIntegrityConstraintViolationException e) {
-            // duplicate exception
-            return null;
+            throw e;
         }
     }
 
     public MysqlRegistryLock getLockById(long lockId) throws SQLException {
-        String sql =
-                "SELECT `id`, `key`, lock_owner, last_term, last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setLong(1, lockId);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                if (resultSet.next()) {
-                    return MysqlRegistryLock.builder()
-                            .id(resultSet.getLong("id"))
-                            .key(resultSet.getString("key"))
-                            .lockOwner(resultSet.getString("lock_owner"))
-                            .lastTerm(resultSet.getLong("last_term"))
-                            .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
-                            .createTime(resultSet.getTimestamp("create_time"))
-                            .build();
-                }
-            }
-            return null;
-        }
+        return mysqlRegistryLockMapper.selectById(lockId);
     }
 
-    // release the lock
     public boolean releaseLock(long lockId) throws SQLException {
-        String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?";
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setLong(1, lockId);
-            int i = preparedStatement.executeUpdate();
-            return i > 0;
-        }
+        return mysqlRegistryLockMapper.deleteById(lockId) > 0;
     }
 
     public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException {
-        StringBuilder sb = new StringBuilder("update t_ds_mysql_registry_data set `last_term` = ? where `id` IN (");
-        Iterator<Long> iterator = ephemeralDateIds.iterator();
-        for (int i = 0; i < ephemeralDateIds.size(); i++) {
-            sb.append(iterator.next());
-            if (i != ephemeralDateIds.size() - 1) {
-                sb.append(",");
-            }
-        }
-        sb.append(")");
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
-            preparedStatement.setLong(1, System.currentTimeMillis());
-            return preparedStatement.executeUpdate() > 0;
+        if (CollectionUtils.isEmpty(ephemeralDateIds)) {
+            return true;
         }
+        return mysqlRegistryDataMapper.updateTermByIds(ephemeralDateIds, System.currentTimeMillis()) > 0;
     }
 
-    public boolean updateLockTerm(List<Long> lockIds) throws SQLException {
-        StringBuilder sb =
-                new StringBuilder("update t_ds_mysql_registry_lock set `last_term` = ? where `id` IN (");
-        Iterator<Long> iterator = lockIds.iterator();
-        for (int i = 0; i < lockIds.size(); i++) {
-            sb.append(iterator.next());
-            if (i != lockIds.size() - 1) {
-                sb.append(",");
-            }
-        }
-        sb.append(")");
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
-            preparedStatement.setLong(1, System.currentTimeMillis());
-            return preparedStatement.executeUpdate() > 0;
+    public boolean updateLockTerm(List<Long> lockIds) {
+        if (CollectionUtils.isEmpty(lockIds)) {
+            return true;
         }
+        return mysqlRegistryLockMapper.updateTermByIds(lockIds, System.currentTimeMillis()) > 0;
     }
 
-    @Override
-    public void close() throws Exception {
-        if (!dataSource.isClosed()) {
-            try (HikariDataSource closedDatasource = this.dataSource) {
-
-            }
-        }
-    }
 }
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
index d5caae9f00..5a72e80f58 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
@@ -53,10 +53,11 @@ public class MysqlRegistry implements Registry {
     private final EphemeralDateManager ephemeralDateManager;
     private final SubscribeDataManager subscribeDataManager;
     private final RegistryLockManager registryLockManager;
-    private final MysqlOperator mysqlOperator;
+    private MysqlOperator mysqlOperator;
 
-    public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties) {
-        this.mysqlOperator = new MysqlOperator(mysqlRegistryProperties);
+    public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties,
+                         MysqlOperator mysqlOperator) {
+        this.mysqlOperator = mysqlOperator;
         mysqlOperator.clearExpireLock();
         mysqlOperator.clearExpireEphemeralDate();
         this.mysqlRegistryProperties = mysqlRegistryProperties;
@@ -138,8 +139,7 @@ public class MysqlRegistry implements Registry {
     @Override
     public void delete(String key) {
         try {
-            mysqlOperator.deleteEphemeralData(key);
-            mysqlOperator.deletePersistentData(key);
+            mysqlOperator.deleteDataByKey(key);
         } catch (Exception e) {
             throw new RegistryException(String.format("Delete key: %s error", key), e);
         }
@@ -188,8 +188,7 @@ public class MysqlRegistry implements Registry {
         try (
                 EphemeralDateManager closed1 = ephemeralDateManager;
                 SubscribeDataManager close2 = subscribeDataManager;
-                RegistryLockManager close3 = registryLockManager;
-                MysqlOperator closed4 = mysqlOperator) {
+                RegistryLockManager close3 = registryLockManager) {
         } catch (Exception e) {
             LOGGER.error("Close Mysql Registry error", e);
         }
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java
new file mode 100644
index 0000000000..70a1baae53
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper;
+import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper;
+
+import org.apache.ibatis.session.SqlSessionFactory;
+
+import org.mybatis.spring.SqlSessionTemplate;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
+import com.zaxxer.hikari.HikariDataSource;
+
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
+public class MysqlRegistryConfiguration {
+
+    @Bean
+    public SqlSessionFactory mysqlRegistrySqlSessionFactory(MysqlRegistryProperties mysqlRegistryProperties) throws Exception {
+        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
+        sqlSessionFactoryBean.setDataSource(new HikariDataSource(mysqlRegistryProperties.getHikariConfig()));
+        return sqlSessionFactoryBean.getObject();
+    }
+
+    @Bean
+    public SqlSessionTemplate mysqlRegistrySqlSessionTemplate(SqlSessionFactory mysqlRegistrySqlSessionFactory) {
+        mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryDataMapper.class);
+        mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryLockMapper.class);
+        return new SqlSessionTemplate(mysqlRegistrySqlSessionFactory);
+    }
+
+    @Bean
+    public MysqlRegistryDataMapper mysqlRegistryDataMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) {
+        return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryDataMapper.class);
+    }
+
+    @Bean
+    public MysqlRegistryLockMapper mysqlRegistryLockMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) {
+        return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryLockMapper.class);
+    }
+
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java
new file mode 100644
index 0000000000..ef0cd5c3cc
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.mapper;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
+
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface MysqlRegistryDataMapper extends BaseMapper<MysqlRegistryData> {
+
+    @Select("select * from t_ds_mysql_registry_data")
+    List<MysqlRegistryData> selectAll();
+
+    @Select("select * from t_ds_mysql_registry_data where `key` = #{key}")
+    MysqlRegistryData selectByKey(@Param("key") String key);
+
+    @Select("select * from t_ds_mysql_registry_data where `key` like CONCAT (#{key}, '%')")
+    List<MysqlRegistryData> fuzzyQueryByKey(@Param("key") String key);
+
+    @Update("update t_ds_mysql_registry_data set `data` = #{data}, `last_term` = #{term} where `id` = #{id}")
+    int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term);
+
+    @Delete("delete from t_ds_mysql_registry_data where `key` = #{key}")
+    void deleteByKey(@Param("key") String key);
+
+    @Delete("delete from t_ds_mysql_registry_data where `last_term` < #{term} and `type` = #{type}")
+    void clearExpireEphemeralDate(@Param("term") long term, @Param("type") int type);
+
+    @Update({"<script>",
+            "update t_ds_mysql_registry_data",
+            "set `last_term` = #{term}",
+            "where id IN ",
+            "<foreach item='id' index='index' collection='ids' open='(' separator=',' close=')'>",
+            "   #{id}",
+            "</foreach>",
+            "</script>"})
+    int updateTermByIds(@Param("ids") Collection<Long> ids, @Param("term") long term);
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java
new file mode 100644
index 0000000000..5e22a7473a
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.mapper;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
+
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.Collection;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface MysqlRegistryLockMapper extends BaseMapper<MysqlRegistryLock> {
+
+    @Select("select count(1) from t_ds_mysql_registry_lock")
+    int countAll();
+
+    @Delete("delete from t_ds_mysql_registry_lock where `last_term` < #{term}")
+    void clearExpireLock(@Param("term") long term);
+
+    @Update({"<script>",
+            "update t_ds_mysql_registry_lock",
+            "set `last_term` = #{term}",
+            "where id IN ",
+            "<foreach item='id' index='index' collection='ids' open='(' separator=',' close=')'>",
+            "   #{id}",
+            "</foreach>",
+            "</script>"})
+    int updateTermByIds(@Param("ids") Collection<Long> ids, @Param("term") long term);
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
index e9ff6c81de..a4f80db608 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
@@ -24,18 +24,31 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+
+@TableName(value = "t_ds_mysql_registry_data")
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class MysqlRegistryData {
 
-    private long id;
+    @TableId(value = "id", type = IdType.AUTO)
+    private Long id;
+    @TableField(value = "`key`")
     private String key;
+    @TableField(value = "`data`")
     private String data;
+    @TableField(value = "`type`")
     private int type;
+    @TableField(value = "`last_term`")
     private long lastTerm;
+    @TableField(value = "`create_time`")
     private Date createTime;
+    @TableField(value = "`last_time`")
     private Date lastUpdateTime;
 
 }
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
index a86e4afe2c..2739a61561 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
@@ -24,31 +24,43 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+
+@TableName(value = "t_ds_mysql_registry_lock")
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class MysqlRegistryLock {
 
-    private long id;
+    @TableId(value = "id", type = IdType.AUTO)
+    private Long id;
     /**
      * The lock key.
      */
+    @TableField(value = "`key`")
     private String key;
     /**
      * acquire lock host.
      */
+    @TableField(value = "`lock_owner`")
     private String lockOwner;
     /**
      * The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired.
      */
+    @TableField(value = "`last_term`")
     private Long lastTerm;
     /**
      * The lock last update time.
      */
+    @TableField(value = "`last_update_time`")
     private Date lastUpdateTime;
     /**
      * The lock create time.
      */
+    @TableField(value = "`create_time`")
     private Date createTime;
 }
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
index d17806c6bd..dc4a55797f 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
@@ -89,7 +89,7 @@ public class EphemeralDateManager implements AutoCloseable {
         connectionListeners.clear();
         scheduledExecutorService.shutdownNow();
         for (Long ephemeralDateId : ephemeralDateIds) {
-            mysqlOperator.deleteEphemeralData(ephemeralDateId);
+            mysqlOperator.deleteDataById(ephemeralDateId);
         }
     }