You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/19 15:50:02 UTC

[dolphinscheduler] branch 2.0.0-release updated: [cherry-pick][api] rewrite code generate, fix bit shift (#6937)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.0-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.0-release by this push:
     new adaeb96  [cherry-pick][api] rewrite code generate,fix bit shift (#6937)
adaeb96 is described below

commit adaeb962eccd593dfef664b4c71910684dc13c1f
Author: OS <29...@users.noreply.github.com>
AuthorDate: Fri Nov 19 23:49:52 2021 +0800

    [cherry-pick][api] rewrite code generate,fix bit shift (#6937)
    
    * [Fix][Common] rewrite code generate,fix bit shift (#6914)
    
    * rewrite code generate,fix bit shift
    
    * fix ut
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * fix ut
    # Conflicts:
    #	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
    #	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
    #	dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
    #	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
    #	dolphinscheduler-dist/release-docs/LICENSE
    #	dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
    #	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
    
    * [Fix][Common] rewrite code generate,fix bit shift (#6914)
    
    * rewrite code generate,fix bit shift
    
    * fix ut
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * fix ut
    # Conflicts:
    #	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
    #	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
    #	dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
    #	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
    #	dolphinscheduler-dist/release-docs/LICENSE
    #	dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
    #	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
    
    * [Fix][Common] rewrite code generate,fix bit shift (#6914)
    
    * rewrite code generate,fix bit shift
    
    * fix ut
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * fix ut
    # Conflicts:
    #	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
    #	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
    #	dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
    #	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
    #	dolphinscheduler-dist/release-docs/LICENSE
    #	dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
    #	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
    
    * [Fix][Common] rewrite code generate,fix bit shift (#6914)
    
    * rewrite code generate,fix bit shift
    
    * fix ut
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * add algorithm from licenses file
    
    * fix ut
    # Conflicts:
    #	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
    #	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
    #	dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
    #	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
    #	dolphinscheduler-dist/release-docs/LICENSE
    #	dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
    #	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
    
    Co-authored-by: JinYong Li <42...@users.noreply.github.com>
---
 .licenserc.yaml                                    |   1 +
 LICENSE                                            |   2 +-
 .../api/service/impl/EnvironmentServiceImpl.java   |   8 +-
 .../service/impl/ProcessDefinitionServiceImpl.java |  20 +-
 .../api/service/impl/ProjectServiceImpl.java       |   8 +-
 .../service/impl/TaskDefinitionServiceImpl.java    |   8 +-
 .../common/utils/CodeGenerateUtils.java            |  74 ++++
 .../common/utils/SnowFlakeUtils.java               |  94 ------
 ...keUtilsTest.java => CodeGenerateUtilsTest.java} |  19 +-
 .../dao/upgrade/ProcessDefinitionDao.java          |   4 +-
 .../dolphinscheduler/dao/upgrade/ProjectDao.java   |   4 +-
 .../dolphinscheduler/dao/upgrade/UpgradeDao.java   |   5 +-
 dolphinscheduler-dist/release-docs/LICENSE         |   2 +
 .../release-docs/licenses/LICENSE-snowflake.txt    |  11 +
 .../server/PythonGatewayServer.java                | 374 +++++++++++++++++++++
 .../service/process/ProcessService.java            | 138 ++++----
 licenses/LICENSE-snowflake.txt                     |  11 +
 17 files changed, 582 insertions(+), 201 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 44a776e..b82839b 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -26,6 +26,7 @@ header:
     - LICENSE
     - DISCLAIMER
     - dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
+    - dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
     - mvnw.cmd
     - sql/soft_version
     - .mvn
diff --git a/LICENSE b/LICENSE
index 8c9beb8..20191b3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -219,4 +219,4 @@ The text of each license is the standard Apache 2.0 license.
     DolphinPluginClassLoader from https://github.com/prestosql/presto Apache 2.0
     DolphinPluginDiscovery from https://github.com/prestosql/presto Apache 2.0
     DolphinPluginLoader from https://github.com/prestosql/presto Apache 2.0
-
+    CodeGenerateUtils from https://github.com/twitter-archive/snowflake/tree/snowflake-2010 Apache 2.0
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
index 7291485..89cd2af 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
@@ -23,10 +23,10 @@ import org.apache.dolphinscheduler.api.service.EnvironmentService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
 import org.apache.dolphinscheduler.dao.entity.Environment;
 import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -115,9 +115,9 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
         env.setUpdateTime(new Date());
         long code = 0L;
         try {
-            code = SnowFlakeUtils.getInstance().nextId();
+            code = CodeGenerateUtils.getInstance().genCode();
             env.setCode(code);
-        } catch (SnowFlakeException e) {
+        } catch (CodeGenerateException e) {
             logger.error("Environment code get error, ", e);
         }
         if (code == 0L) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 4c126a7..96318bc 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -39,10 +39,10 @@ import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
 import org.apache.dolphinscheduler.dao.entity.DagData;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@@ -220,8 +220,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         long processDefinitionCode;
         try {
-            processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
-        } catch (SnowFlakeException e) {
+            processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
+        } catch (CodeGenerateException e) {
             putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
             return result;
         }
@@ -868,8 +868,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         processDefinition.setProjectCode(projectCode);
         processDefinition.setUserId(loginUser.getId());
         try {
-            processDefinition.setCode(SnowFlakeUtils.getInstance().nextId());
-        } catch (SnowFlakeException e) {
+            processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
+        } catch (CodeGenerateException e) {
             putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
             return false;
         }
@@ -888,10 +888,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             taskDefinitionLog.setOperator(loginUser.getId());
             taskDefinitionLog.setOperateTime(now);
             try {
-                long code = SnowFlakeUtils.getInstance().nextId();
+                long code = CodeGenerateUtils.getInstance().genCode();
                 taskCodeMap.put(taskDefinitionLog.getCode(), code);
                 taskDefinitionLog.setCode(code);
-            } catch (SnowFlakeException e) {
+            } catch (CodeGenerateException e) {
                 logger.error("Task code get error, ", e);
                 putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
                 return false;
@@ -1357,8 +1357,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             processDefinition.setProjectCode(targetProjectCode);
             if (isCopy) {
                 try {
-                    processDefinition.setCode(SnowFlakeUtils.getInstance().nextId());
-                } catch (SnowFlakeException e) {
+                    processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
+                } catch (CodeGenerateException e) {
                     putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
                     throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
                 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 7d49cfa..75347ee 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.ProjectUser;
@@ -97,14 +97,14 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
             project = Project
                     .newBuilder()
                     .name(name)
-                    .code(SnowFlakeUtils.getInstance().nextId())
+                    .code(CodeGenerateUtils.getInstance().genCode())
                     .description(desc)
                     .userId(loginUser.getId())
                     .userName(loginUser.getUserName())
                     .createTime(now)
                     .updateTime(now)
                     .build();
-        } catch (SnowFlakeException e) {
+        } catch (CodeGenerateException e) {
             putMsg(result, Status.CREATE_PROJECT_ERROR);
             return result;
         }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index e203b9b..621af45 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -25,9 +25,9 @@ import org.apache.dolphinscheduler.api.utils.CheckUtils;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -416,9 +416,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
         List<Long> taskCodes = new ArrayList<>();
         try {
             for (int i = 0; i < genNum; i++) {
-                taskCodes.add(SnowFlakeUtils.getInstance().nextId());
+                taskCodes.add(CodeGenerateUtils.getInstance().genCode());
             }
-        } catch (SnowFlakeException e) {
+        } catch (CodeGenerateException e) {
             logger.error("Task code get error, ", e);
             putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
         }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
new file mode 100644
index 0000000..ffea87b
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
@@ -0,0 +1,74 @@
+/** Copyright 2010-2012 Twitter, Inc.*/
+
+package org.apache.dolphinscheduler.common.utils;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Objects;
+
+/**
+ *  Rewriting based on Twitter snowflake algorithm
+ */
+public class CodeGenerateUtils {
+    // start timestamp
+    private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01 00:00:00
+    // Each machine generates 32 in the same millisecond
+    private static final long LOW_DIGIT_BIT = 5L;
+    private static final long MIDDLE_BIT = 2L;
+    private static final long MAX_LOW_DIGIT = ~(-1L << LOW_DIGIT_BIT);
+    // The displacement to the left
+    private static final long MIDDLE_LEFT = LOW_DIGIT_BIT;
+    private static final long HIGH_DIGIT_LEFT = LOW_DIGIT_BIT + MIDDLE_BIT;
+    private final long machineHash;
+    private long lowDigit = 0L;
+    private long recordMillisecond = -1L;
+
+    private static final long SYSTEM_TIMESTAMP = System.currentTimeMillis();
+    private static final long SYSTEM_NANOTIME  = System.nanoTime();
+
+    private CodeGenerateUtils() throws CodeGenerateException {
+        try {
+            this.machineHash = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1));
+        } catch (UnknownHostException e) {
+            throw new CodeGenerateException(e.getMessage());
+        }
+    }
+
+    private static CodeGenerateUtils instance = null;
+
+    public static synchronized CodeGenerateUtils getInstance() throws CodeGenerateException {
+        if (instance == null) {
+            instance = new CodeGenerateUtils();
+        }
+        return instance;
+    }
+
+    public synchronized long genCode() throws CodeGenerateException {
+        long nowtMillisecond = systemMillisecond();
+        if (nowtMillisecond < recordMillisecond) {
+            throw new CodeGenerateException("New code exception because time is set back.");
+        }
+        if (nowtMillisecond == recordMillisecond) {
+            lowDigit = (lowDigit + 1) & MAX_LOW_DIGIT;
+            if (lowDigit == 0L) {
+                while (nowtMillisecond <= recordMillisecond) {
+                    nowtMillisecond = systemMillisecond();
+                }
+            }
+        } else {
+            lowDigit = 0L;
+        }
+        recordMillisecond = nowtMillisecond;
+        return (nowtMillisecond - START_TIMESTAMP) << HIGH_DIGIT_LEFT | machineHash << MIDDLE_LEFT | lowDigit;
+    }
+
+    private long systemMillisecond() {
+        return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000;
+    }
+
+    public static class CodeGenerateException extends Exception {
+        public CodeGenerateException(String message) {
+            super(message);
+        }
+    }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java
deleted file mode 100644
index 6393e19..0000000
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Objects;
-
-public class SnowFlakeUtils {
-    // start timestamp
-    private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01 00:00:00
-    // Each machine generates 32 in the same millisecond
-    private static final long SEQUENCE_BIT = 5;
-    private static final long MACHINE_BIT = 2;
-    private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
-    // The displacement to the left
-    private static final long MACHINE_LEFT = SEQUENCE_BIT + MACHINE_BIT;
-    private static final long TIMESTAMP_LEFT = SEQUENCE_BIT + MACHINE_BIT + MACHINE_LEFT;
-    private final int machineId;
-    private long sequence = 0L;
-    private long lastTimestamp = -1L;
-
-    private static final long SYSTEM_TIMESTAMP = System.currentTimeMillis();
-    private static final long SYSTEM_NANOTIME  = System.nanoTime();
-
-    private SnowFlakeUtils() throws SnowFlakeException {
-        try {
-            this.machineId = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % 4;
-        } catch (UnknownHostException e) {
-            throw new SnowFlakeException(e.getMessage());
-        }
-    }
-
-    private static SnowFlakeUtils instance = null;
-
-    public static synchronized SnowFlakeUtils getInstance() throws SnowFlakeException {
-        if (instance == null) {
-            instance = new SnowFlakeUtils();
-        }
-        return instance;
-    }
-
-    public synchronized long nextId() throws SnowFlakeException {
-        long currStmp = nowTimestamp();
-        if (currStmp < lastTimestamp) {
-            throw new SnowFlakeException("Clock moved backwards. Refusing to generate id");
-        }
-        if (currStmp == lastTimestamp) {
-            sequence = (sequence + 1) & MAX_SEQUENCE;
-            if (sequence == 0L) {
-                currStmp = getNextMill();
-            }
-        } else {
-            sequence = 0L;
-        }
-        lastTimestamp = currStmp;
-        return (currStmp - START_TIMESTAMP) << TIMESTAMP_LEFT
-            | machineId << MACHINE_LEFT
-            | sequence;
-    }
-
-    private long getNextMill() {
-        long mill = nowTimestamp();
-        while (mill <= lastTimestamp) {
-            mill = nowTimestamp();
-        }
-        return mill;
-    }
-
-    private long nowTimestamp() {
-        return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000;
-    }
-
-    public static class SnowFlakeException extends Exception {
-        public SnowFlakeException(String message) {
-            super(message);
-        }
-    }
-}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
similarity index 66%
rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtilsTest.java
rename to dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
index bdf7096..d949bd82 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
@@ -17,18 +17,19 @@
 
 package org.apache.dolphinscheduler.common.utils;
 
+import java.util.HashSet;
+
+import org.junit.Assert;
 import org.junit.Test;
 
-public class SnowFlakeUtilsTest {
+public class CodeGenerateUtilsTest {
     @Test
-    public void testNextId() {
-        try {
-            for (int i = 0; i < 100; i++) {
-                Thread.sleep(1);
-                System.out.println(SnowFlakeUtils.getInstance().nextId());
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
+    public void testNoGenerateDuplicateCode() throws CodeGenerateUtils.CodeGenerateException {
+        HashSet<Long> existsCode = new HashSet<>();
+        for (int i = 0; i < 100; i++) {
+            Long currentCode = CodeGenerateUtils.getInstance().genCode();
+            Assert.assertFalse(existsCode.contains(currentCode));
+            existsCode.add(currentCode);
         }
     }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
index c41359a..f4d198e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
@@ -20,8 +20,8 @@ package org.apache.dolphinscheduler.dao.upgrade;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 
 import java.sql.Connection;
@@ -110,7 +110,7 @@ public class ProcessDefinitionDao {
                 processDefinition.setId(rs.getInt(1));
                 long code = rs.getLong(2);
                 if (code == 0L) {
-                    code = SnowFlakeUtils.getInstance().nextId();
+                    code = CodeGenerateUtils.getInstance().genCode();
                 }
                 processDefinition.setCode(code);
                 processDefinition.setVersion(Constants.VERSION_FIRST);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
index 794d71a..2906902 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
@@ -17,8 +17,8 @@
 
 package org.apache.dolphinscheduler.dao.upgrade;
 
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -51,7 +51,7 @@ public class ProjectDao {
                 Integer id = rs.getInt(1);
                 long code = rs.getLong(2);
                 if (code == 0L) {
-                    code = SnowFlakeUtils.getInstance().nextId();
+                    code = CodeGenerateUtils.getInstance().genCode();
                 }
                 projectMap.put(id, code);
             }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
index 2087c6e..68fd446 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
@@ -25,12 +25,12 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.SchemaUtils;
 import org.apache.dolphinscheduler.common.utils.ScriptRunner;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
 import org.apache.dolphinscheduler.dao.AbstractBaseDao;
 import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -684,8 +684,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
                 String name = task.get("name").asText();
                 taskDefinitionLog.setName(name);
                 taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText());
-                long taskCode = SnowFlakeUtils.getInstance().nextId();
-                // System.out.println(taskCode);
+                long taskCode = CodeGenerateUtils.getInstance().genCode();
                 taskDefinitionLog.setCode(taskCode);
                 taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
                 taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE
index 63ab3f6..8707314 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -417,6 +417,8 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
     protostuff-runtime 1.7.2: https://github.com/protostuff/protostuff/protostuff-core Apache-2.0
     protostuff-api 1.7.2: https://github.com/protostuff/protostuff/protostuff-api Apache-2.0
     protostuff-collectionschema 1.7.2: https://github.com/protostuff/protostuff/protostuff-collectionschema Apache-2.0
+    snowflake snowflake-2010: https://github.com/twitter-archive/snowflake/tree/snowflake-2010, Apache 2.0
+
 ========================================================================
 BSD licenses
 ========================================================================
diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-snowflake.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-snowflake.txt
new file mode 100644
index 0000000..e257174
--- /dev/null
+++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-snowflake.txt
@@ -0,0 +1,11 @@
+Copyright 2010-2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+file except in compliance with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed
+under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
\ No newline at end of file
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
new file mode 100644
index 0000000..9ae966f
--- /dev/null
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.QueueService;
+import org.apache.dolphinscheduler.api.service.SchedulerService;
+import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
+import org.apache.dolphinscheduler.api.service.TenantService;
+import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
+
+import py4j.GatewayServer;
+
+@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
+    @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
+        "org.apache.dolphinscheduler.server.master.*",
+        "org.apache.dolphinscheduler.server.worker.*",
+        "org.apache.dolphinscheduler.server.monitor.*",
+        "org.apache.dolphinscheduler.server.log.*",
+        "org.apache.dolphinscheduler.alert.*"
+    })
+})
+public class PythonGatewayServer extends SpringBootServletInitializer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class);
+    
+    private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
+    private static final int DEFAULT_WARNING_GROUP_ID = 0;
+    private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE;
+    private static final Priority DEFAULT_PRIORITY = Priority.MEDIUM;
+    private static final Long DEFAULT_ENVIRONMENT_CODE = -1L;
+
+    private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST;
+    private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
+    private static final int DEFAULT_DRY_RUN = 0;
+
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private TenantService tenantService;
+
+    @Autowired
+    private ExecutorService executorService;
+
+    @Autowired
+    private ProcessDefinitionService processDefinitionService;
+
+    @Autowired
+    private TaskDefinitionService taskDefinitionService;
+
+    @Autowired
+    private UsersService usersService;
+
+    @Autowired
+    private QueueService queueService;
+
+    @Autowired
+    private ProjectMapper projectMapper;
+
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private SchedulerService schedulerService;
+
+    @Autowired
+    private ScheduleMapper scheduleMapper;
+
+    // TODO replace this user to build in admin user if we make sure build in one could not be change
+    private final User dummyAdminUser = new User() {
+        {
+            setId(Integer.MAX_VALUE);
+            setUserName("dummyUser");
+            setUserType(UserType.ADMIN_USER);
+        }
+    };
+
+    private final Queue queuePythonGateway = new Queue() {
+        {
+            setId(Integer.MAX_VALUE);
+            setQueueName("queuePythonGateway");
+        }
+    };
+
+    public String ping() {
+        return "PONG";
+    }
+
+    // TODO Should we import package in python client side? utils package can but service can not, why
+    // Core api
+    public Map<String, Object> genTaskCodeList(Integer genNum) {
+        return taskDefinitionService.genTaskCodeList(genNum);
+    }
+
+    public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
+        Project project = projectMapper.queryByName(projectName);
+        Map<String, Long> result = new HashMap<>();
+        // project do not exists, mean task not exists too, so we should directly return init value
+        if (project == null) {
+            result.put("code", CodeGenerateUtils.getInstance().genCode());
+            result.put("version", 0L);
+            return result;
+        }
+        TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
+        if (taskDefinition == null) {
+            result.put("code", CodeGenerateUtils.getInstance().genCode());
+            result.put("version", 0L);
+        } else {
+            result.put("code", taskDefinition.getCode());
+            result.put("version", (long) taskDefinition.getVersion());
+        }
+        return result;
+    }
+
+    /**
+     * create or update process definition.
+     * If process definition do not exists in Project=`projectCode` would create a new one
+     * If process definition already exists in Project=`projectCode` would update it
+     *
+     * @param userName           user name who create or update process definition
+     * @param projectName        project name which process definition belongs to
+     * @param name               process definition name
+     * @param description        description
+     * @param globalParams       global params
+     * @param schedule           schedule for process definition, will not set schedule if null,
+     *                           and if would always fresh exists schedule if not null
+     * @param locations          locations json object about all tasks
+     * @param timeout            timeout for process definition working, if running time longer than timeout,
+     *                           task will mark as fail
+     * @param workerGroup        run task in which worker group
+     * @param tenantCode         tenantCode
+     * @param taskRelationJson   relation json for nodes
+     * @param taskDefinitionJson taskDefinitionJson
+     * @return create result code
+     */
+    public Long createOrUpdateProcessDefinition(String userName,
+                                                String projectName,
+                                                String name,
+                                                String description,
+                                                String globalParams,
+                                                String schedule,
+                                                String locations,
+                                                int timeout,
+                                                String workerGroup,
+                                                String tenantCode,
+                                                String taskRelationJson,
+                                                String taskDefinitionJson,
+                                                ProcessExecutionTypeEnum executionType) {
+        User user = usersService.queryUser(userName);
+        Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
+        long projectCode = project.getCode();
+        Map<String, Object> verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, name);
+        Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS);
+
+        long processDefinitionCode;
+        // create or update process definition
+        if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) {
+            ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name);
+            processDefinitionCode = processDefinition.getCode();
+            // make sure process definition offline which could edit
+            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
+            Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams,
+                locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
+        } else if (verifyStatus == Status.SUCCESS) {
+            Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
+                locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
+            ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST);
+            processDefinitionCode = processDefinition.getCode();
+        } else {
+            String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
+            LOGGER.error(msg);
+            throw new RuntimeException(msg);
+        }
+        
+        // Fresh process definition schedule 
+        if (schedule != null) {
+            createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup);
+        }
+        processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
+        return processDefinitionCode;
+    }
+
+    /**
+     * create or update process definition schedule.
+     * It would always use latest schedule define in workflow-as-code, and set schedule online when
+     * it's not null
+     *
+     * @param user                  user who create or update schedule
+     * @param projectCode           project which process definition belongs to
+     * @param processDefinitionCode process definition code
+     * @param schedule              schedule expression
+     * @param workerGroup           work group
+     */
+    private void createOrUpdateSchedule(User user,
+                                        long projectCode,
+                                        long processDefinitionCode,
+                                        String schedule,
+                                        String workerGroup) {
+        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
+        // create or update schedule
+        int scheduleId;
+        if (schedules.isEmpty()) {
+            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
+            Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
+                DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
+            scheduleId = (int) result.get("scheduleId");
+        } else {
+            scheduleId = schedules.get(0).getId();
+            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
+            schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
+                DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
+        }
+        schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
+    }
+
+    public void execProcessInstance(String userName,
+                                    String projectName,
+                                    String processDefinitionName,
+                                    String cronTime,
+                                    String workerGroup,
+                                    Integer timeout
+    ) {
+        User user = usersService.queryUser(userName);
+        Project project = projectMapper.queryByName(projectName);
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+
+        // make sure process definition online
+        processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
+
+        executorService.execProcessInstance(user,
+            project.getCode(),
+            processDefinition.getCode(),
+            cronTime,
+            null,
+            DEFAULT_FAILURE_STRATEGY,
+            null,
+            DEFAULT_TASK_DEPEND_TYPE,
+            DEFAULT_WARNING_TYPE,
+            DEFAULT_WARNING_GROUP_ID,
+            DEFAULT_RUN_MODE,
+            DEFAULT_PRIORITY,
+            workerGroup,
+            DEFAULT_ENVIRONMENT_CODE,
+            timeout,
+            null,
+            null,
+            DEFAULT_DRY_RUN
+        );
+    }
+
+    // side object
+    public Map<String, Object> createProject(String userName, String name, String desc) {
+        User user = usersService.queryUser(userName);
+        return projectService.createProject(user, name, desc);
+    }
+
+    public Map<String, Object> createQueue(String name, String queueName) {
+        Result<Object> verifyQueueExists = queueService.verifyQueue(name, queueName);
+        if (verifyQueueExists.getCode() == 0) {
+            return queueService.createQueue(dummyAdminUser, name, queueName);
+        } else {
+            Map<String, Object> result = new HashMap<>();
+            // TODO function putMsg do not work here
+            result.put(Constants.STATUS, Status.SUCCESS);
+            result.put(Constants.MSG, Status.SUCCESS.getMsg());
+            return result;
+        }
+    }
+
+    public Map<String, Object> createTenant(String tenantCode, String desc, String queueName) throws Exception {
+        if (tenantService.checkTenantExists(tenantCode)) {
+            Map<String, Object> result = new HashMap<>();
+            // TODO function putMsg do not work here
+            result.put(Constants.STATUS, Status.SUCCESS);
+            result.put(Constants.MSG, Status.SUCCESS.getMsg());
+            return result;
+        } else {
+            Result<Object> verifyQueueExists = queueService.verifyQueue(queueName, queueName);
+            if (verifyQueueExists.getCode() == 0) {
+                // TODO why create do not return id?
+                queueService.createQueue(dummyAdminUser, queueName, queueName);
+            }
+            Map<String, Object> result = queueService.queryQueueName(queueName);
+            List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
+            Queue queue = queueList.get(0);
+            return tenantService.createTenant(dummyAdminUser, tenantCode, queue.getId(), desc);
+        }
+    }
+
+    public void createUser(String userName,
+                           String userPassword,
+                           String email,
+                           String phone,
+                           String tenantCode,
+                           String queue,
+                           int state) {
+        User user = usersService.queryUser(userName);
+        if (Objects.isNull(user)) {
+            Map<String, Object> tenantResult = tenantService.queryByTenantCode(tenantCode);
+            Tenant tenant = (Tenant) tenantResult.get(Constants.DATA_LIST);
+            usersService.createUser(userName, userPassword, email, tenant.getId(), phone, queue, state);
+        }
+    }
+
+    @PostConstruct
+    public void run() {
+        GatewayServer server = new GatewayServer(this);
+        GatewayServer.turnLoggingOn();
+        // Start server to accept python client RPC
+        server.start();
+    }
+
+    public static void main(String[] args) {
+        SpringApplication.run(PythonGatewayServer.class, args);
+    }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 81bad0b..d054555 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.service.process;
 
+import static java.util.stream.Collectors.toSet;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -27,8 +28,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
 import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
 
-import static java.util.stream.Collectors.toSet;
-
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -51,12 +50,12 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
-import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
 import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.DagData;
@@ -212,6 +211,7 @@ public class ProcessService {
      * @param processDefinitionCacheMaps
      * @return process instance
      */
+    @Transactional
     public ProcessInstance handleCommand(Logger logger, String host, Command command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
         ProcessInstance processInstance = constructProcessInstance(command, host, processDefinitionCacheMaps);
         // cannot construct process instance, return null
@@ -634,7 +634,7 @@ public class ProcessService {
         startParamMap.putAll(fatherParamMap);
         // set start param into global params
         if (startParamMap.size() > 0
-            && processDefinition.getGlobalParamMap() != null) {
+                && processDefinition.getGlobalParamMap() != null) {
             for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
                 String val = startParamMap.get(param.getKey());
                 if (val != null) {
@@ -696,8 +696,8 @@ public class ProcessService {
     private Boolean checkCmdParam(Command command, Map<String, String> cmdParam) {
         if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) {
             if (cmdParam == null
-                || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
-                || cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
+                    || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
+                    || cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
                 logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType());
                 return false;
             }
@@ -708,9 +708,8 @@ public class ProcessService {
     /**
      * construct process instance according to one command.
      *
-     * @param command                    command
-     * @param host                       host
-     * @param processDefinitionCacheMaps
+     * @param command command
+     * @param host host
      * @return process instance
      */
     private ProcessInstance constructProcessInstance(Command command, String host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
@@ -883,7 +882,7 @@ public class ProcessService {
                 }
 
                 return processDefineLogMapper.queryByDefinitionCodeAndVersion(
-                    processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
+                        processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
             }
         }
 
@@ -929,9 +928,9 @@ public class ProcessService {
             processInstance.setScheduleTime(complementDate.get(0));
         }
         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-            processDefinition.getGlobalParamMap(),
-            processDefinition.getGlobalParamList(),
-            CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
+                processDefinition.getGlobalParamMap(),
+                processDefinition.getGlobalParamList(),
+                CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
     }
 
     /**
@@ -949,7 +948,7 @@ public class ProcessService {
         Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
         // write sub process id into cmd param.
         if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
-            && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
+                && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
             paramMap.remove(CMD_PARAM_SUB_PROCESS);
             paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
             subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
@@ -962,7 +961,7 @@ public class ProcessService {
             ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
             if (parentInstance != null) {
                 subProcessInstance.setGlobalParams(
-                    joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
+                        joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
                 this.saveProcessInstance(subProcessInstance);
             } else {
                 logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
@@ -1009,7 +1008,7 @@ public class ProcessService {
     private void initTaskInstance(TaskInstance taskInstance) {
 
         if (!taskInstance.isSubProcess()
-            && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
+                && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
             taskInstance.setFlag(Flag.NO);
             updateTaskInstance(taskInstance);
             return;
@@ -1064,12 +1063,12 @@ public class ProcessService {
     public TaskInstance submitTask(TaskInstance taskInstance) {
         ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
         logger.info("start submit task : {}, instance id:{}, state: {}",
-            taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
+                taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
         //submit to db
         TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
         if (task == null) {
             logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
-                taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
+                    taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
             return task;
         }
         if (!task.getState().typeIsFinished()) {
@@ -1135,7 +1134,7 @@ public class ProcessService {
             }
         }
         logger.info("sub process instance is not found,parent task:{},parent instance:{}",
-            parentTask.getId(), parentProcessInstance.getId());
+                parentTask.getId(), parentProcessInstance.getId());
         return null;
     }
 
@@ -1227,21 +1226,21 @@ public class ProcessService {
         String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
         int subProcessInstanceId = childInstance == null ? 0 : childInstance.getId();
         return new Command(
-            commandType,
-            TaskDependType.TASK_POST,
-            parentProcessInstance.getFailureStrategy(),
-            parentProcessInstance.getExecutorId(),
-            subProcessDefinition.getCode(),
-            processParam,
-            parentProcessInstance.getWarningType(),
-            parentProcessInstance.getWarningGroupId(),
-            parentProcessInstance.getScheduleTime(),
-            task.getWorkerGroup(),
-            task.getEnvironmentCode(),
-            parentProcessInstance.getProcessInstancePriority(),
-            parentProcessInstance.getDryRun(),
-            subProcessInstanceId,
-            subProcessDefinition.getVersion()
+                commandType,
+                TaskDependType.TASK_POST,
+                parentProcessInstance.getFailureStrategy(),
+                parentProcessInstance.getExecutorId(),
+                subProcessDefinition.getCode(),
+                processParam,
+                parentProcessInstance.getWarningType(),
+                parentProcessInstance.getWarningGroupId(),
+                parentProcessInstance.getScheduleTime(),
+                task.getWorkerGroup(),
+                task.getEnvironmentCode(),
+                parentProcessInstance.getProcessInstancePriority(),
+                parentProcessInstance.getDryRun(),
+                subProcessInstanceId,
+                subProcessDefinition.getVersion()
         );
     }
 
@@ -1278,7 +1277,7 @@ public class ProcessService {
      */
     private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
         ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
-            parentProcessInstance.getProcessDefinitionVersion());
+                parentProcessInstance.getProcessDefinitionVersion());
         ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode);
         if (childDefinition != null && fatherDefinition != null) {
             childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
@@ -1301,7 +1300,7 @@ public class ProcessService {
                 taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
             } else {
                 if (processInstanceState != ExecutionStatus.READY_STOP
-                    && processInstanceState != ExecutionStatus.READY_PAUSE) {
+                        && processInstanceState != ExecutionStatus.READY_PAUSE) {
                     // failure task set invalid
                     taskInstance.setFlag(Flag.NO);
                     updateTaskInstance(taskInstance);
@@ -1354,9 +1353,9 @@ public class ProcessService {
         // the task already exists in task queue
         // return state
         if (
-            state == ExecutionStatus.RUNNING_EXECUTION
-                || state == ExecutionStatus.DELAY_EXECUTION
-                || state == ExecutionStatus.KILL
+                state == ExecutionStatus.RUNNING_EXECUTION
+                        || state == ExecutionStatus.DELAY_EXECUTION
+                        || state == ExecutionStatus.KILL
         ) {
             return state;
         }
@@ -1365,7 +1364,7 @@ public class ProcessService {
         if (processInstanceState == ExecutionStatus.READY_PAUSE) {
             state = ExecutionStatus.PAUSE;
         } else if (processInstanceState == ExecutionStatus.READY_STOP
-            || !checkProcessStrategy(taskInstance)) {
+                || !checkProcessStrategy(taskInstance)) {
             state = ExecutionStatus.KILL;
         } else {
             state = ExecutionStatus.SUBMITTED_SUCCESS;
@@ -1389,7 +1388,7 @@ public class ProcessService {
 
         for (TaskInstance task : taskInstances) {
             if (task.getState() == ExecutionStatus.FAILURE
-                && task.getRetryTimes() >= task.getMaxRetryTimes()) {
+                    && task.getRetryTimes() >= task.getMaxRetryTimes()) {
                 return false;
             }
         }
@@ -1518,7 +1517,8 @@ public class ProcessService {
     private void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
         Map<String, Object> taskParameters = JSONUtils.parseObject(
                 taskDefinition.getTaskParams(),
-                new TypeReference<Map<String, Object>>() { });
+                new TypeReference<Map<String, Object>>() {
+                });
         if (taskParameters != null) {
             // if contains mainJar field, query resource from database
             // Flink, Spark, MR
@@ -1756,7 +1756,8 @@ public class ProcessService {
             return;
         }
         //if the result more than one line,just get the first .
-        Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {});
+        Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {
+        });
         Object localParams = taskParams.get(LOCAL_PARAMS);
         if (localParams == null) {
             return;
@@ -1953,8 +1954,8 @@ public class ProcessService {
      */
     public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
         return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
-            dateInterval.getStartTime(),
-            dateInterval.getEndTime());
+                dateInterval.getStartTime(),
+                dateInterval.getEndTime());
     }
 
     /**
@@ -1966,8 +1967,8 @@ public class ProcessService {
      */
     public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
         return processInstanceMapper.queryLastManualProcess(definitionCode,
-            dateInterval.getStartTime(),
-            dateInterval.getEndTime());
+                dateInterval.getStartTime(),
+                dateInterval.getEndTime());
     }
 
     /**
@@ -1980,9 +1981,9 @@ public class ProcessService {
      */
     public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
         return processInstanceMapper.queryLastRunningProcess(definitionCode,
-            startTime,
-            endTime,
-            stateArray);
+                startTime,
+                endTime,
+                stateArray);
     }
 
     /**
@@ -2188,10 +2189,10 @@ public class ProcessService {
         AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams());
         if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
             resourceIds = params.getResourceFilesList().
-                stream()
-                .filter(t -> t.getId() != 0)
-                .map(ResourceInfo::getId)
-                .collect(Collectors.toSet());
+                    stream()
+                    .filter(t -> t.getId() != 0)
+                    .map(ResourceInfo::getId)
+                    .collect(Collectors.toSet());
         }
         if (CollectionUtils.isEmpty(resourceIds)) {
             return StringUtils.EMPTY;
@@ -2211,7 +2212,7 @@ public class ProcessService {
             taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
             if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) {
                 TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
-                    .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
+                        .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
                 if (definitionCodeAndVersion != null) {
                     if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
                         taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
@@ -2228,8 +2229,8 @@ public class ProcessService {
             taskDefinitionLog.setCreateTime(now);
             if (taskDefinitionLog.getCode() == 0) {
                 try {
-                    taskDefinitionLog.setCode(SnowFlakeUtils.getInstance().nextId());
-                } catch (SnowFlakeException e) {
+                    taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
+                } catch (CodeGenerateException e) {
                     logger.error("Task code get error, ", e);
                     return Constants.DEFINITION_FAILURE;
                 }
@@ -2285,7 +2286,7 @@ public class ProcessService {
         Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
         if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
             taskDefinitionLogMap = taskDefinitionLogs.stream()
-                .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
+                    .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
         }
         Date now = new Date();
         for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
@@ -2308,7 +2309,8 @@ public class ProcessService {
         if (!processTaskRelationList.isEmpty()) {
             Set<Integer> processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
             Set<Integer> taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
-            if (CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet)) {
+            boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
+            if (result) {
                 return Constants.EXIT_CODE_SUCCESS;
             }
             processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
@@ -2322,9 +2324,9 @@ public class ProcessService {
         List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
         if (!processTaskRelationList.isEmpty()) {
             Set<Long> processDefinitionCodes = processTaskRelationList
-                .stream()
-                .map(ProcessTaskRelation::getProcessDefinitionCode)
-                .collect(Collectors.toSet());
+                    .stream()
+                    .map(ProcessTaskRelation::getProcessDefinitionCode)
+                    .collect(Collectors.toSet());
             List<ProcessDefinition> processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes);
             // check process definition is already online
             for (ProcessDefinition processDefinition : processDefinitionList) {
@@ -2357,8 +2359,8 @@ public class ProcessService {
         List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
         List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(processTaskRelations);
         List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream()
-            .map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class))
-            .collect(Collectors.toList());
+                .map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class))
+                .collect(Collectors.toList());
         return new DagData(processDefinition, processTaskRelations, taskDefinitions);
     }
 
@@ -2421,7 +2423,7 @@ public class ProcessService {
             taskDefinitionLogs = genTaskDefineList(taskRelationList);
         }
         Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
-            .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
+                .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
         List<TaskNode> taskNodeList = new ArrayList<>();
         for (Entry<Long, List<Long>> code : taskCodeMap.entrySet()) {
             TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(code.getKey());
@@ -2446,8 +2448,8 @@ public class ProcessService {
                 taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
                 taskNode.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode());
                 taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
-                    taskDefinitionLog.getTimeoutNotifyStrategy(),
-                    taskDefinitionLog.getTimeout())));
+                        taskDefinitionLog.getTimeoutNotifyStrategy(),
+                        taskDefinitionLog.getTimeout())));
                 taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
                 taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
                 taskNodeList.add(taskNode);
diff --git a/licenses/LICENSE-snowflake.txt b/licenses/LICENSE-snowflake.txt
new file mode 100644
index 0000000..e257174
--- /dev/null
+++ b/licenses/LICENSE-snowflake.txt
@@ -0,0 +1,11 @@
+Copyright 2010-2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+file except in compliance with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed
+under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
\ No newline at end of file