You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/19 15:50:02 UTC
[dolphinscheduler] branch 2.0.0-release updated: [cherry-pick][api] rewrite code generate, fix bit shift (#6937)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.0-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.0-release by this push:
new adaeb96 [cherry-pick][api] rewrite code generate,fix bit shift (#6937)
adaeb96 is described below
commit adaeb962eccd593dfef664b4c71910684dc13c1f
Author: OS <29...@users.noreply.github.com>
AuthorDate: Fri Nov 19 23:49:52 2021 +0800
[cherry-pick][api] rewrite code generate,fix bit shift (#6937)
* [Fix][Common] rewrite code generate,fix bit shift (#6914)
* rewrite code generate,fix bit shift
* fix ut
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* fix ut
# Conflicts:
# dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
# dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
# dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
# dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
# dolphinscheduler-dist/release-docs/LICENSE
# dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
# dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
* [Fix][Common] rewrite code generate,fix bit shift (#6914)
* rewrite code generate,fix bit shift
* fix ut
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* fix ut
# Conflicts:
# dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
# dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
# dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
# dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
# dolphinscheduler-dist/release-docs/LICENSE
# dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
# dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
* [Fix][Common] rewrite code generate,fix bit shift (#6914)
* rewrite code generate,fix bit shift
* fix ut
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* fix ut
# Conflicts:
# dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
# dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
# dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
# dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
# dolphinscheduler-dist/release-docs/LICENSE
# dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
# dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
* [Fix][Common] rewrite code generate,fix bit shift (#6914)
* rewrite code generate,fix bit shift
* fix ut
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* add algorithm from licenses file
* fix ut
# Conflicts:
# dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
# dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
# dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
# dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
# dolphinscheduler-dist/release-docs/LICENSE
# dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
# dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
Co-authored-by: JinYong Li <42...@users.noreply.github.com>
---
.licenserc.yaml | 1 +
LICENSE | 2 +-
.../api/service/impl/EnvironmentServiceImpl.java | 8 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 20 +-
.../api/service/impl/ProjectServiceImpl.java | 8 +-
.../service/impl/TaskDefinitionServiceImpl.java | 8 +-
.../common/utils/CodeGenerateUtils.java | 74 ++++
.../common/utils/SnowFlakeUtils.java | 94 ------
...keUtilsTest.java => CodeGenerateUtilsTest.java} | 19 +-
.../dao/upgrade/ProcessDefinitionDao.java | 4 +-
.../dolphinscheduler/dao/upgrade/ProjectDao.java | 4 +-
.../dolphinscheduler/dao/upgrade/UpgradeDao.java | 5 +-
dolphinscheduler-dist/release-docs/LICENSE | 2 +
.../release-docs/licenses/LICENSE-snowflake.txt | 11 +
.../server/PythonGatewayServer.java | 374 +++++++++++++++++++++
.../service/process/ProcessService.java | 138 ++++----
licenses/LICENSE-snowflake.txt | 11 +
17 files changed, 582 insertions(+), 201 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 44a776e..b82839b 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -26,6 +26,7 @@ header:
- LICENSE
- DISCLAIMER
- dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
+ - dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
- mvnw.cmd
- sql/soft_version
- .mvn
diff --git a/LICENSE b/LICENSE
index 8c9beb8..20191b3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -219,4 +219,4 @@ The text of each license is the standard Apache 2.0 license.
DolphinPluginClassLoader from https://github.com/prestosql/presto Apache 2.0
DolphinPluginDiscovery from https://github.com/prestosql/presto Apache 2.0
DolphinPluginLoader from https://github.com/prestosql/presto Apache 2.0
-
+ CodeGenerateUtils from https://github.com/twitter-archive/snowflake/tree/snowflake-2010 Apache 2.0
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
index 7291485..89cd2af 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
@@ -23,10 +23,10 @@ import org.apache.dolphinscheduler.api.service.EnvironmentService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -115,9 +115,9 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
env.setUpdateTime(new Date());
long code = 0L;
try {
- code = SnowFlakeUtils.getInstance().nextId();
+ code = CodeGenerateUtils.getInstance().genCode();
env.setCode(code);
- } catch (SnowFlakeException e) {
+ } catch (CodeGenerateException e) {
logger.error("Environment code get error, ", e);
}
if (code == 0L) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 4c126a7..96318bc 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -39,10 +39,10 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@@ -220,8 +220,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
long processDefinitionCode;
try {
- processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
- } catch (SnowFlakeException e) {
+ processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
+ } catch (CodeGenerateException e) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
return result;
}
@@ -868,8 +868,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setProjectCode(projectCode);
processDefinition.setUserId(loginUser.getId());
try {
- processDefinition.setCode(SnowFlakeUtils.getInstance().nextId());
- } catch (SnowFlakeException e) {
+ processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
+ } catch (CodeGenerateException e) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
return false;
}
@@ -888,10 +888,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskDefinitionLog.setOperator(loginUser.getId());
taskDefinitionLog.setOperateTime(now);
try {
- long code = SnowFlakeUtils.getInstance().nextId();
+ long code = CodeGenerateUtils.getInstance().genCode();
taskCodeMap.put(taskDefinitionLog.getCode(), code);
taskDefinitionLog.setCode(code);
- } catch (SnowFlakeException e) {
+ } catch (CodeGenerateException e) {
logger.error("Task code get error, ", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return false;
@@ -1357,8 +1357,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
try {
- processDefinition.setCode(SnowFlakeUtils.getInstance().nextId());
- } catch (SnowFlakeException e) {
+ processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
+ } catch (CodeGenerateException e) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 7d49cfa..75347ee 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
@@ -97,14 +97,14 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
project = Project
.newBuilder()
.name(name)
- .code(SnowFlakeUtils.getInstance().nextId())
+ .code(CodeGenerateUtils.getInstance().genCode())
.description(desc)
.userId(loginUser.getId())
.userName(loginUser.getUserName())
.createTime(now)
.updateTime(now)
.build();
- } catch (SnowFlakeException e) {
+ } catch (CodeGenerateException e) {
putMsg(result, Status.CREATE_PROJECT_ERROR);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index e203b9b..621af45 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -25,9 +25,9 @@ import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -416,9 +416,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
List<Long> taskCodes = new ArrayList<>();
try {
for (int i = 0; i < genNum; i++) {
- taskCodes.add(SnowFlakeUtils.getInstance().nextId());
+ taskCodes.add(CodeGenerateUtils.getInstance().genCode());
}
- } catch (SnowFlakeException e) {
+ } catch (CodeGenerateException e) {
logger.error("Task code get error, ", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
new file mode 100644
index 0000000..ffea87b
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
@@ -0,0 +1,74 @@
+/** Copyright 2010-2012 Twitter, Inc.*/
+
+package org.apache.dolphinscheduler.common.utils;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Objects;
+
+/**
+ * Rewriting based on Twitter snowflake algorithm
+ */
+public class CodeGenerateUtils {
+ // start timestamp
+ private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01 00:00:00
+ // Each machine generates 32 in the same millisecond
+ private static final long LOW_DIGIT_BIT = 5L;
+ private static final long MIDDLE_BIT = 2L;
+ private static final long MAX_LOW_DIGIT = ~(-1L << LOW_DIGIT_BIT);
+ // The displacement to the left
+ private static final long MIDDLE_LEFT = LOW_DIGIT_BIT;
+ private static final long HIGH_DIGIT_LEFT = LOW_DIGIT_BIT + MIDDLE_BIT;
+ private final long machineHash;
+ private long lowDigit = 0L;
+ private long recordMillisecond = -1L;
+
+ private static final long SYSTEM_TIMESTAMP = System.currentTimeMillis();
+ private static final long SYSTEM_NANOTIME = System.nanoTime();
+
+ private CodeGenerateUtils() throws CodeGenerateException {
+ try {
+ this.machineHash = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1));
+ } catch (UnknownHostException e) {
+ throw new CodeGenerateException(e.getMessage());
+ }
+ }
+
+ private static CodeGenerateUtils instance = null;
+
+ public static synchronized CodeGenerateUtils getInstance() throws CodeGenerateException {
+ if (instance == null) {
+ instance = new CodeGenerateUtils();
+ }
+ return instance;
+ }
+
+ public synchronized long genCode() throws CodeGenerateException {
+ long nowtMillisecond = systemMillisecond();
+ if (nowtMillisecond < recordMillisecond) {
+ throw new CodeGenerateException("New code exception because time is set back.");
+ }
+ if (nowtMillisecond == recordMillisecond) {
+ lowDigit = (lowDigit + 1) & MAX_LOW_DIGIT;
+ if (lowDigit == 0L) {
+ while (nowtMillisecond <= recordMillisecond) {
+ nowtMillisecond = systemMillisecond();
+ }
+ }
+ } else {
+ lowDigit = 0L;
+ }
+ recordMillisecond = nowtMillisecond;
+ return (nowtMillisecond - START_TIMESTAMP) << HIGH_DIGIT_LEFT | machineHash << MIDDLE_LEFT | lowDigit;
+ }
+
+ private long systemMillisecond() {
+ return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000;
+ }
+
+ public static class CodeGenerateException extends Exception {
+ public CodeGenerateException(String message) {
+ super(message);
+ }
+ }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java
deleted file mode 100644
index 6393e19..0000000
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Objects;
-
-public class SnowFlakeUtils {
- // start timestamp
- private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01 00:00:00
- // Each machine generates 32 in the same millisecond
- private static final long SEQUENCE_BIT = 5;
- private static final long MACHINE_BIT = 2;
- private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
- // The displacement to the left
- private static final long MACHINE_LEFT = SEQUENCE_BIT + MACHINE_BIT;
- private static final long TIMESTAMP_LEFT = SEQUENCE_BIT + MACHINE_BIT + MACHINE_LEFT;
- private final int machineId;
- private long sequence = 0L;
- private long lastTimestamp = -1L;
-
- private static final long SYSTEM_TIMESTAMP = System.currentTimeMillis();
- private static final long SYSTEM_NANOTIME = System.nanoTime();
-
- private SnowFlakeUtils() throws SnowFlakeException {
- try {
- this.machineId = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % 4;
- } catch (UnknownHostException e) {
- throw new SnowFlakeException(e.getMessage());
- }
- }
-
- private static SnowFlakeUtils instance = null;
-
- public static synchronized SnowFlakeUtils getInstance() throws SnowFlakeException {
- if (instance == null) {
- instance = new SnowFlakeUtils();
- }
- return instance;
- }
-
- public synchronized long nextId() throws SnowFlakeException {
- long currStmp = nowTimestamp();
- if (currStmp < lastTimestamp) {
- throw new SnowFlakeException("Clock moved backwards. Refusing to generate id");
- }
- if (currStmp == lastTimestamp) {
- sequence = (sequence + 1) & MAX_SEQUENCE;
- if (sequence == 0L) {
- currStmp = getNextMill();
- }
- } else {
- sequence = 0L;
- }
- lastTimestamp = currStmp;
- return (currStmp - START_TIMESTAMP) << TIMESTAMP_LEFT
- | machineId << MACHINE_LEFT
- | sequence;
- }
-
- private long getNextMill() {
- long mill = nowTimestamp();
- while (mill <= lastTimestamp) {
- mill = nowTimestamp();
- }
- return mill;
- }
-
- private long nowTimestamp() {
- return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000;
- }
-
- public static class SnowFlakeException extends Exception {
- public SnowFlakeException(String message) {
- super(message);
- }
- }
-}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
similarity index 66%
rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtilsTest.java
rename to dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
index bdf7096..d949bd82 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
@@ -17,18 +17,19 @@
package org.apache.dolphinscheduler.common.utils;
+import java.util.HashSet;
+
+import org.junit.Assert;
import org.junit.Test;
-public class SnowFlakeUtilsTest {
+public class CodeGenerateUtilsTest {
@Test
- public void testNextId() {
- try {
- for (int i = 0; i < 100; i++) {
- Thread.sleep(1);
- System.out.println(SnowFlakeUtils.getInstance().nextId());
- }
- } catch (Exception e) {
- e.printStackTrace();
+ public void testNoGenerateDuplicateCode() throws CodeGenerateUtils.CodeGenerateException {
+ HashSet<Long> existsCode = new HashSet<>();
+ for (int i = 0; i < 100; i++) {
+ Long currentCode = CodeGenerateUtils.getInstance().genCode();
+ Assert.assertFalse(existsCode.contains(currentCode));
+ existsCode.add(currentCode);
}
}
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
index c41359a..f4d198e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
@@ -20,8 +20,8 @@ package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import java.sql.Connection;
@@ -110,7 +110,7 @@ public class ProcessDefinitionDao {
processDefinition.setId(rs.getInt(1));
long code = rs.getLong(2);
if (code == 0L) {
- code = SnowFlakeUtils.getInstance().nextId();
+ code = CodeGenerateUtils.getInstance().genCode();
}
processDefinition.setCode(code);
processDefinition.setVersion(Constants.VERSION_FIRST);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
index 794d71a..2906902 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.dao.upgrade;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -51,7 +51,7 @@ public class ProjectDao {
Integer id = rs.getInt(1);
long code = rs.getLong(2);
if (code == 0L) {
- code = SnowFlakeUtils.getInstance().nextId();
+ code = CodeGenerateUtils.getInstance().genCode();
}
projectMap.put(id, code);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
index 2087c6e..68fd446 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
@@ -25,12 +25,12 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -684,8 +684,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
String name = task.get("name").asText();
taskDefinitionLog.setName(name);
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText());
- long taskCode = SnowFlakeUtils.getInstance().nextId();
- // System.out.println(taskCode);
+ long taskCode = CodeGenerateUtils.getInstance().genCode();
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE
index 63ab3f6..8707314 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -417,6 +417,8 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
protostuff-runtime 1.7.2: https://github.com/protostuff/protostuff/protostuff-core Apache-2.0
protostuff-api 1.7.2: https://github.com/protostuff/protostuff/protostuff-api Apache-2.0
protostuff-collectionschema 1.7.2: https://github.com/protostuff/protostuff/protostuff-collectionschema Apache-2.0
+ snowflake snowflake-2010: https://github.com/twitter-archive/snowflake/tree/snowflake-2010, Apache 2.0
+
========================================================================
BSD licenses
========================================================================
diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-snowflake.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-snowflake.txt
new file mode 100644
index 0000000..e257174
--- /dev/null
+++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-snowflake.txt
@@ -0,0 +1,11 @@
+Copyright 2010-2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+file except in compliance with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed
+under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
\ No newline at end of file
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
new file mode 100644
index 0000000..9ae966f
--- /dev/null
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.QueueService;
+import org.apache.dolphinscheduler.api.service.SchedulerService;
+import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
+import org.apache.dolphinscheduler.api.service.TenantService;
+import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
+
+import py4j.GatewayServer;
+
+@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
+ @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
+ "org.apache.dolphinscheduler.server.master.*",
+ "org.apache.dolphinscheduler.server.worker.*",
+ "org.apache.dolphinscheduler.server.monitor.*",
+ "org.apache.dolphinscheduler.server.log.*",
+ "org.apache.dolphinscheduler.alert.*"
+ })
+})
+public class PythonGatewayServer extends SpringBootServletInitializer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class);
+
+ private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
+ private static final int DEFAULT_WARNING_GROUP_ID = 0;
+ private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE;
+ private static final Priority DEFAULT_PRIORITY = Priority.MEDIUM;
+ private static final Long DEFAULT_ENVIRONMENT_CODE = -1L;
+
+ private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST;
+ private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
+ private static final int DEFAULT_DRY_RUN = 0;
+
+ @Autowired
+ private ProcessDefinitionMapper processDefinitionMapper;
+
+ @Autowired
+ private ProjectService projectService;
+
+ @Autowired
+ private TenantService tenantService;
+
+ @Autowired
+ private ExecutorService executorService;
+
+ @Autowired
+ private ProcessDefinitionService processDefinitionService;
+
+ @Autowired
+ private TaskDefinitionService taskDefinitionService;
+
+ @Autowired
+ private UsersService usersService;
+
+ @Autowired
+ private QueueService queueService;
+
+ @Autowired
+ private ProjectMapper projectMapper;
+
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ @Autowired
+ private SchedulerService schedulerService;
+
+ @Autowired
+ private ScheduleMapper scheduleMapper;
+
+ // TODO replace this user to build in admin user if we make sure build in one could not be change
+ private final User dummyAdminUser = new User() {
+ {
+ setId(Integer.MAX_VALUE);
+ setUserName("dummyUser");
+ setUserType(UserType.ADMIN_USER);
+ }
+ };
+
+ private final Queue queuePythonGateway = new Queue() {
+ {
+ setId(Integer.MAX_VALUE);
+ setQueueName("queuePythonGateway");
+ }
+ };
+
+ public String ping() {
+ return "PONG";
+ }
+
+ // TODO Should we import package in python client side? utils package can but service can not, why
+ // Core api
+ public Map<String, Object> genTaskCodeList(Integer genNum) {
+ return taskDefinitionService.genTaskCodeList(genNum);
+ }
+
+ public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
+ Project project = projectMapper.queryByName(projectName);
+ Map<String, Long> result = new HashMap<>();
+ // project do not exists, mean task not exists too, so we should directly return init value
+ if (project == null) {
+ result.put("code", CodeGenerateUtils.getInstance().genCode());
+ result.put("version", 0L);
+ return result;
+ }
+ TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
+ if (taskDefinition == null) {
+ result.put("code", CodeGenerateUtils.getInstance().genCode());
+ result.put("version", 0L);
+ } else {
+ result.put("code", taskDefinition.getCode());
+ result.put("version", (long) taskDefinition.getVersion());
+ }
+ return result;
+ }
+
+ /**
+ * create or update process definition.
+ * If process definition do not exists in Project=`projectCode` would create a new one
+ * If process definition already exists in Project=`projectCode` would update it
+ *
+ * @param userName user name who create or update process definition
+ * @param projectName project name which process definition belongs to
+ * @param name process definition name
+ * @param description description
+ * @param globalParams global params
+ * @param schedule schedule for process definition, will not set schedule if null,
+ * and if would always fresh exists schedule if not null
+ * @param locations locations json object about all tasks
+ * @param timeout timeout for process definition working, if running time longer than timeout,
+ * task will mark as fail
+ * @param workerGroup run task in which worker group
+ * @param tenantCode tenantCode
+ * @param taskRelationJson relation json for nodes
+ * @param taskDefinitionJson taskDefinitionJson
+ * @return create result code
+ */
+ public Long createOrUpdateProcessDefinition(String userName,
+ String projectName,
+ String name,
+ String description,
+ String globalParams,
+ String schedule,
+ String locations,
+ int timeout,
+ String workerGroup,
+ String tenantCode,
+ String taskRelationJson,
+ String taskDefinitionJson,
+ ProcessExecutionTypeEnum executionType) {
+ User user = usersService.queryUser(userName);
+ Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
+ long projectCode = project.getCode();
+ Map<String, Object> verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, name);
+ Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS);
+
+ long processDefinitionCode;
+ // create or update process definition
+ if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) {
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name);
+ processDefinitionCode = processDefinition.getCode();
+ // make sure process definition offline which could edit
+ processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
+ Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams,
+ locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
+ } else if (verifyStatus == Status.SUCCESS) {
+ Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
+ locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
+ ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST);
+ processDefinitionCode = processDefinition.getCode();
+ } else {
+ String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
+ LOGGER.error(msg);
+ throw new RuntimeException(msg);
+ }
+
+ // Fresh process definition schedule
+ if (schedule != null) {
+ createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup);
+ }
+ processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
+ return processDefinitionCode;
+ }
+
+ /**
+ * create or update process definition schedule.
+ * It would always use latest schedule define in workflow-as-code, and set schedule online when
+ * it's not null
+ *
+ * @param user user who create or update schedule
+ * @param projectCode project which process definition belongs to
+ * @param processDefinitionCode process definition code
+ * @param schedule schedule expression
+ * @param workerGroup work group
+ */
+ private void createOrUpdateSchedule(User user,
+ long projectCode,
+ long processDefinitionCode,
+ String schedule,
+ String workerGroup) {
+ List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
+ // create or update schedule
+ int scheduleId;
+ if (schedules.isEmpty()) {
+ processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
+ Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
+ DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
+ scheduleId = (int) result.get("scheduleId");
+ } else {
+ scheduleId = schedules.get(0).getId();
+ processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
+ schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
+ DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
+ }
+ schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
+ }
+
+ public void execProcessInstance(String userName,
+ String projectName,
+ String processDefinitionName,
+ String cronTime,
+ String workerGroup,
+ Integer timeout
+ ) {
+ User user = usersService.queryUser(userName);
+ Project project = projectMapper.queryByName(projectName);
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+
+ // make sure process definition online
+ processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
+
+ executorService.execProcessInstance(user,
+ project.getCode(),
+ processDefinition.getCode(),
+ cronTime,
+ null,
+ DEFAULT_FAILURE_STRATEGY,
+ null,
+ DEFAULT_TASK_DEPEND_TYPE,
+ DEFAULT_WARNING_TYPE,
+ DEFAULT_WARNING_GROUP_ID,
+ DEFAULT_RUN_MODE,
+ DEFAULT_PRIORITY,
+ workerGroup,
+ DEFAULT_ENVIRONMENT_CODE,
+ timeout,
+ null,
+ null,
+ DEFAULT_DRY_RUN
+ );
+ }
+
+ // side object
+ public Map<String, Object> createProject(String userName, String name, String desc) {
+ User user = usersService.queryUser(userName);
+ return projectService.createProject(user, name, desc);
+ }
+
+ public Map<String, Object> createQueue(String name, String queueName) {
+ Result<Object> verifyQueueExists = queueService.verifyQueue(name, queueName);
+ if (verifyQueueExists.getCode() == 0) {
+ return queueService.createQueue(dummyAdminUser, name, queueName);
+ } else {
+ Map<String, Object> result = new HashMap<>();
+ // TODO function putMsg do not work here
+ result.put(Constants.STATUS, Status.SUCCESS);
+ result.put(Constants.MSG, Status.SUCCESS.getMsg());
+ return result;
+ }
+ }
+
+ public Map<String, Object> createTenant(String tenantCode, String desc, String queueName) throws Exception {
+ if (tenantService.checkTenantExists(tenantCode)) {
+ Map<String, Object> result = new HashMap<>();
+ // TODO function putMsg do not work here
+ result.put(Constants.STATUS, Status.SUCCESS);
+ result.put(Constants.MSG, Status.SUCCESS.getMsg());
+ return result;
+ } else {
+ Result<Object> verifyQueueExists = queueService.verifyQueue(queueName, queueName);
+ if (verifyQueueExists.getCode() == 0) {
+ // TODO why create do not return id?
+ queueService.createQueue(dummyAdminUser, queueName, queueName);
+ }
+ Map<String, Object> result = queueService.queryQueueName(queueName);
+ List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
+ Queue queue = queueList.get(0);
+ return tenantService.createTenant(dummyAdminUser, tenantCode, queue.getId(), desc);
+ }
+ }
+
+ public void createUser(String userName,
+ String userPassword,
+ String email,
+ String phone,
+ String tenantCode,
+ String queue,
+ int state) {
+ User user = usersService.queryUser(userName);
+ if (Objects.isNull(user)) {
+ Map<String, Object> tenantResult = tenantService.queryByTenantCode(tenantCode);
+ Tenant tenant = (Tenant) tenantResult.get(Constants.DATA_LIST);
+ usersService.createUser(userName, userPassword, email, tenant.getId(), phone, queue, state);
+ }
+ }
+
+ @PostConstruct
+ public void run() {
+ GatewayServer server = new GatewayServer(this);
+ GatewayServer.turnLoggingOn();
+ // Start server to accept python client RPC
+ server.start();
+ }
+
+ public static void main(String[] args) {
+ SpringApplication.run(PythonGatewayServer.class, args);
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 81bad0b..d054555 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.process;
+import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -27,8 +28,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static java.util.stream.Collectors.toSet;
-
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -51,12 +50,12 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
@@ -212,6 +211,7 @@ public class ProcessService {
* @param processDefinitionCacheMaps
* @return process instance
*/
+ @Transactional
public ProcessInstance handleCommand(Logger logger, String host, Command command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
ProcessInstance processInstance = constructProcessInstance(command, host, processDefinitionCacheMaps);
// cannot construct process instance, return null
@@ -634,7 +634,7 @@ public class ProcessService {
startParamMap.putAll(fatherParamMap);
// set start param into global params
if (startParamMap.size() > 0
- && processDefinition.getGlobalParamMap() != null) {
+ && processDefinition.getGlobalParamMap() != null) {
for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
@@ -696,8 +696,8 @@ public class ProcessService {
private Boolean checkCmdParam(Command command, Map<String, String> cmdParam) {
if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) {
if (cmdParam == null
- || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
- || cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
+ || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
+ || cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType());
return false;
}
@@ -708,9 +708,8 @@ public class ProcessService {
/**
* construct process instance according to one command.
*
- * @param command command
- * @param host host
- * @param processDefinitionCacheMaps
+ * @param command command
+ * @param host host
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
@@ -883,7 +882,7 @@ public class ProcessService {
}
return processDefineLogMapper.queryByDefinitionCodeAndVersion(
- processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
}
}
@@ -929,9 +928,9 @@ public class ProcessService {
processInstance.setScheduleTime(complementDate.get(0));
}
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
}
/**
@@ -949,7 +948,7 @@ public class ProcessService {
Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
// write sub process id into cmd param.
if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
- && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
+ && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
paramMap.remove(CMD_PARAM_SUB_PROCESS);
paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
@@ -962,7 +961,7 @@ public class ProcessService {
ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
if (parentInstance != null) {
subProcessInstance.setGlobalParams(
- joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
+ joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
this.saveProcessInstance(subProcessInstance);
} else {
logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
@@ -1009,7 +1008,7 @@ public class ProcessService {
private void initTaskInstance(TaskInstance taskInstance) {
if (!taskInstance.isSubProcess()
- && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
+ && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
return;
@@ -1064,12 +1063,12 @@ public class ProcessService {
public TaskInstance submitTask(TaskInstance taskInstance) {
ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
logger.info("start submit task : {}, instance id:{}, state: {}",
- taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
+ taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
if (task == null) {
logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
- taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
+ taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
return task;
}
if (!task.getState().typeIsFinished()) {
@@ -1135,7 +1134,7 @@ public class ProcessService {
}
}
logger.info("sub process instance is not found,parent task:{},parent instance:{}",
- parentTask.getId(), parentProcessInstance.getId());
+ parentTask.getId(), parentProcessInstance.getId());
return null;
}
@@ -1227,21 +1226,21 @@ public class ProcessService {
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
int subProcessInstanceId = childInstance == null ? 0 : childInstance.getId();
return new Command(
- commandType,
- TaskDependType.TASK_POST,
- parentProcessInstance.getFailureStrategy(),
- parentProcessInstance.getExecutorId(),
- subProcessDefinition.getCode(),
- processParam,
- parentProcessInstance.getWarningType(),
- parentProcessInstance.getWarningGroupId(),
- parentProcessInstance.getScheduleTime(),
- task.getWorkerGroup(),
- task.getEnvironmentCode(),
- parentProcessInstance.getProcessInstancePriority(),
- parentProcessInstance.getDryRun(),
- subProcessInstanceId,
- subProcessDefinition.getVersion()
+ commandType,
+ TaskDependType.TASK_POST,
+ parentProcessInstance.getFailureStrategy(),
+ parentProcessInstance.getExecutorId(),
+ subProcessDefinition.getCode(),
+ processParam,
+ parentProcessInstance.getWarningType(),
+ parentProcessInstance.getWarningGroupId(),
+ parentProcessInstance.getScheduleTime(),
+ task.getWorkerGroup(),
+ task.getEnvironmentCode(),
+ parentProcessInstance.getProcessInstancePriority(),
+ parentProcessInstance.getDryRun(),
+ subProcessInstanceId,
+ subProcessDefinition.getVersion()
);
}
@@ -1278,7 +1277,7 @@ public class ProcessService {
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
- parentProcessInstance.getProcessDefinitionVersion());
+ parentProcessInstance.getProcessDefinitionVersion());
ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode);
if (childDefinition != null && fatherDefinition != null) {
childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
@@ -1301,7 +1300,7 @@ public class ProcessService {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
} else {
if (processInstanceState != ExecutionStatus.READY_STOP
- && processInstanceState != ExecutionStatus.READY_PAUSE) {
+ && processInstanceState != ExecutionStatus.READY_PAUSE) {
// failure task set invalid
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
@@ -1354,9 +1353,9 @@ public class ProcessService {
// the task already exists in task queue
// return state
if (
- state == ExecutionStatus.RUNNING_EXECUTION
- || state == ExecutionStatus.DELAY_EXECUTION
- || state == ExecutionStatus.KILL
+ state == ExecutionStatus.RUNNING_EXECUTION
+ || state == ExecutionStatus.DELAY_EXECUTION
+ || state == ExecutionStatus.KILL
) {
return state;
}
@@ -1365,7 +1364,7 @@ public class ProcessService {
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
state = ExecutionStatus.PAUSE;
} else if (processInstanceState == ExecutionStatus.READY_STOP
- || !checkProcessStrategy(taskInstance)) {
+ || !checkProcessStrategy(taskInstance)) {
state = ExecutionStatus.KILL;
} else {
state = ExecutionStatus.SUBMITTED_SUCCESS;
@@ -1389,7 +1388,7 @@ public class ProcessService {
for (TaskInstance task : taskInstances) {
if (task.getState() == ExecutionStatus.FAILURE
- && task.getRetryTimes() >= task.getMaxRetryTimes()) {
+ && task.getRetryTimes() >= task.getMaxRetryTimes()) {
return false;
}
}
@@ -1518,7 +1517,8 @@ public class ProcessService {
private void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
Map<String, Object> taskParameters = JSONUtils.parseObject(
taskDefinition.getTaskParams(),
- new TypeReference<Map<String, Object>>() { });
+ new TypeReference<Map<String, Object>>() {
+ });
if (taskParameters != null) {
// if contains mainJar field, query resource from database
// Flink, Spark, MR
@@ -1756,7 +1756,8 @@ public class ProcessService {
return;
}
//if the result more than one line,just get the first .
- Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {});
+ Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {
+ });
Object localParams = taskParams.get(LOCAL_PARAMS);
if (localParams == null) {
return;
@@ -1953,8 +1954,8 @@ public class ProcessService {
*/
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime());
}
/**
@@ -1966,8 +1967,8 @@ public class ProcessService {
*/
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime());
}
/**
@@ -1980,9 +1981,9 @@ public class ProcessService {
*/
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
- startTime,
- endTime,
- stateArray);
+ startTime,
+ endTime,
+ stateArray);
}
/**
@@ -2188,10 +2189,10 @@ public class ProcessService {
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList().
- stream()
- .filter(t -> t.getId() != 0)
- .map(ResourceInfo::getId)
- .collect(Collectors.toSet());
+ stream()
+ .filter(t -> t.getId() != 0)
+ .map(ResourceInfo::getId)
+ .collect(Collectors.toSet());
}
if (CollectionUtils.isEmpty(resourceIds)) {
return StringUtils.EMPTY;
@@ -2211,7 +2212,7 @@ public class ProcessService {
taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) {
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
- .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
+ .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion != null) {
if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
@@ -2228,8 +2229,8 @@ public class ProcessService {
taskDefinitionLog.setCreateTime(now);
if (taskDefinitionLog.getCode() == 0) {
try {
- taskDefinitionLog.setCode(SnowFlakeUtils.getInstance().nextId());
- } catch (SnowFlakeException e) {
+ taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
+ } catch (CodeGenerateException e) {
logger.error("Task code get error, ", e);
return Constants.DEFINITION_FAILURE;
}
@@ -2285,7 +2286,7 @@ public class ProcessService {
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs.stream()
- .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
+ .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
}
Date now = new Date();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
@@ -2308,7 +2309,8 @@ public class ProcessService {
if (!processTaskRelationList.isEmpty()) {
Set<Integer> processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
Set<Integer> taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
- if (CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet)) {
+ boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
+ if (result) {
return Constants.EXIT_CODE_SUCCESS;
}
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
@@ -2322,9 +2324,9 @@ public class ProcessService {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
Set<Long> processDefinitionCodes = processTaskRelationList
- .stream()
- .map(ProcessTaskRelation::getProcessDefinitionCode)
- .collect(Collectors.toSet());
+ .stream()
+ .map(ProcessTaskRelation::getProcessDefinitionCode)
+ .collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes);
// check process definition is already online
for (ProcessDefinition processDefinition : processDefinitionList) {
@@ -2357,8 +2359,8 @@ public class ProcessService {
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(processTaskRelations);
List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream()
- .map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class))
- .collect(Collectors.toList());
+ .map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class))
+ .collect(Collectors.toList());
return new DagData(processDefinition, processTaskRelations, taskDefinitions);
}
@@ -2421,7 +2423,7 @@ public class ProcessService {
taskDefinitionLogs = genTaskDefineList(taskRelationList);
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
- .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
+ .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
List<TaskNode> taskNodeList = new ArrayList<>();
for (Entry<Long, List<Long>> code : taskCodeMap.entrySet()) {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(code.getKey());
@@ -2446,8 +2448,8 @@ public class ProcessService {
taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
taskNode.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode());
taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
- taskDefinitionLog.getTimeoutNotifyStrategy(),
- taskDefinitionLog.getTimeout())));
+ taskDefinitionLog.getTimeoutNotifyStrategy(),
+ taskDefinitionLog.getTimeout())));
taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
taskNodeList.add(taskNode);
diff --git a/licenses/LICENSE-snowflake.txt b/licenses/LICENSE-snowflake.txt
new file mode 100644
index 0000000..e257174
--- /dev/null
+++ b/licenses/LICENSE-snowflake.txt
@@ -0,0 +1,11 @@
+Copyright 2010-2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+file except in compliance with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed
+under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
\ No newline at end of file