You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/17 00:59:51 UTC
[dolphinscheduler] branch 3.1.0-prepare updated: Fix insertOrUpdate plugin may failed due to concurrent operation (#11471) (#11996)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.0-prepare by this push:
new 8eadf5e5aa Fix insertOrUpdate plugin may failed due to concurrent operation (#11471) (#11996)
8eadf5e5aa is described below
commit 8eadf5e5aafdd25c59fb1798314e2236424ac58a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Sep 17 08:59:45 2022 +0800
Fix insertOrUpdate plugin may failed due to concurrent operation (#11471) (#11996)
(cherry picked from commit 864a90820de8b8470714574dd506f757a73db874)
---
.../dolphinscheduler/api/ApiApplicationServer.java | 6 +---
.../org/apache/dolphinscheduler/dao/PluginDao.java | 38 ++++++++++++++++++----
2 files changed, 32 insertions(+), 12 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 078506898a..2994208ea2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
@@ -68,10 +67,7 @@ public class ApiApplicationServer {
String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson);
- int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
- if (count <= 0) {
- throw new TaskPluginException("Failed to update task plugin: " + taskPluginName);
- }
+ pluginDao.addOrUpdatePluginDefine(pluginDefine);
}
}
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java
index 57ff712bc8..5ca4812a72 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java
@@ -21,12 +21,20 @@ import static java.util.Objects.requireNonNull;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.dao.mapper.PluginDefineMapper;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
+
+import java.util.Objects;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+@Slf4j
@Component
public class PluginDao {
+
@Autowired
private PluginDefineMapper pluginDefineMapper;
@@ -43,20 +51,36 @@ public class PluginDao {
* add or update plugin define
*
* @param pluginDefine new pluginDefine
+ * @return plugin id
*/
- public int addOrUpdatePluginDefine(PluginDefine pluginDefine) {
- requireNonNull(pluginDefine, "pluginDefine is null");
+ public int addOrUpdatePluginDefine(@NonNull PluginDefine pluginDefine) {
requireNonNull(pluginDefine.getPluginName(), "pluginName is null");
requireNonNull(pluginDefine.getPluginType(), "pluginType is null");
- PluginDefine currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
+ PluginDefine currPluginDefine =
+ pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
if (currPluginDefine == null) {
- if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() > 0) {
- return pluginDefine.getId();
+ try {
+ if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() > 0) {
+ return pluginDefine.getId();
+ }
+ throw new TaskPluginException(
+ String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
+ pluginDefine.getPluginName(), pluginDefine.getPluginType()));
+ } catch (TaskPluginException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ log.error("Insert plugin definition error, there may already exist a plugin", ex);
+ currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(),
+ pluginDefine.getPluginType());
+ if (currPluginDefine == null) {
+ throw new TaskPluginException(
+ String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
+ pluginDefine.getPluginName(), pluginDefine.getPluginType()));
+ }
}
- throw new IllegalStateException("Failed to insert plugin definition");
}
- if (!currPluginDefine.getPluginParams().equals(pluginDefine.getPluginParams())) {
+ if (!Objects.equals(currPluginDefine.getPluginParams(), pluginDefine.getPluginParams())) {
currPluginDefine.setUpdateTime(pluginDefine.getUpdateTime());
currPluginDefine.setPluginParams(pluginDefine.getPluginParams());
pluginDefineMapper.updateById(currPluginDefine);