You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/17 00:59:51 UTC

[dolphinscheduler] branch 3.1.0-prepare updated: Fix insertOrUpdate plugin may failed due to concurrent operation (#11471) (#11996)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.0-prepare by this push:
     new 8eadf5e5aa Fix insertOrUpdate plugin may failed due to concurrent operation (#11471) (#11996)
8eadf5e5aa is described below

commit 8eadf5e5aafdd25c59fb1798314e2236424ac58a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Sep 17 08:59:45 2022 +0800

    Fix insertOrUpdate plugin may failed due to concurrent operation (#11471) (#11996)
    
    (cherry picked from commit 864a90820de8b8470714574dd506f757a73db874)
---
 .../dolphinscheduler/api/ApiApplicationServer.java |  6 +---
 .../org/apache/dolphinscheduler/dao/PluginDao.java | 38 ++++++++++++++++++----
 2 files changed, 32 insertions(+), 12 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 078506898a..2994208ea2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.PluginType;
 import org.apache.dolphinscheduler.dao.PluginDao;
 import org.apache.dolphinscheduler.dao.entity.PluginDefine;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
 import org.apache.dolphinscheduler.spi.params.base.PluginParams;
@@ -68,10 +67,7 @@ public class ApiApplicationServer {
             String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
 
             PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson);
-            int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
-            if (count <= 0) {
-                throw new TaskPluginException("Failed to update task plugin: " + taskPluginName);
-            }
+            pluginDao.addOrUpdatePluginDefine(pluginDefine);
         }
     }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java
index 57ff712bc8..5ca4812a72 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java
@@ -21,12 +21,20 @@ import static java.util.Objects.requireNonNull;
 
 import org.apache.dolphinscheduler.dao.entity.PluginDefine;
 import org.apache.dolphinscheduler.dao.mapper.PluginDefineMapper;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
+
+import java.util.Objects;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+@Slf4j
 @Component
 public class PluginDao {
+
     @Autowired
     private PluginDefineMapper pluginDefineMapper;
 
@@ -43,20 +51,36 @@ public class PluginDao {
      * add or update plugin define
      *
      * @param pluginDefine new pluginDefine
+     * @return plugin id
      */
-    public int addOrUpdatePluginDefine(PluginDefine pluginDefine) {
-        requireNonNull(pluginDefine, "pluginDefine is null");
+    public int addOrUpdatePluginDefine(@NonNull PluginDefine pluginDefine) {
         requireNonNull(pluginDefine.getPluginName(), "pluginName is null");
         requireNonNull(pluginDefine.getPluginType(), "pluginType is null");
 
-        PluginDefine currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
+        PluginDefine currPluginDefine =
+                pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
         if (currPluginDefine == null) {
-            if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() > 0) {
-                return pluginDefine.getId();
+            try {
+                if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() > 0) {
+                    return pluginDefine.getId();
+                }
+                throw new TaskPluginException(
+                        String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
+                                pluginDefine.getPluginName(), pluginDefine.getPluginType()));
+            } catch (TaskPluginException ex) {
+                throw ex;
+            } catch (Exception ex) {
+                log.error("Insert plugin definition error, there may already exist a plugin", ex);
+                currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(),
+                        pluginDefine.getPluginType());
+                if (currPluginDefine == null) {
+                    throw new TaskPluginException(
+                            String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
+                                    pluginDefine.getPluginName(), pluginDefine.getPluginType()));
+                }
             }
-            throw new IllegalStateException("Failed to insert plugin definition");
         }
-        if (!currPluginDefine.getPluginParams().equals(pluginDefine.getPluginParams())) {
+        if (!Objects.equals(currPluginDefine.getPluginParams(), pluginDefine.getPluginParams())) {
             currPluginDefine.setUpdateTime(pluginDefine.getUpdateTime());
             currPluginDefine.setPluginParams(pluginDefine.getPluginParams());
             pluginDefineMapper.updateById(currPluginDefine);