You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/12/08 14:17:18 UTC

[dolphinscheduler] branch dev updated: [python] Refactor get object define communicate to gateway (#7272)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2a7d6b4  [python] Refactor get object define communicate to gateway (#7272)
2a7d6b4 is described below

commit 2a7d6b468f2942d075430478ffe08375f98df007
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Wed Dec 8 22:17:13 2021 +0800

    [python] Refactor get object define communicate to gateway (#7272)
    
    * Change class Base `to_dict` to `get_define` for more clearer
    * Remove class TaskParams and sub class make code easy to
      understand and avoid task implement cycle dependence
    * Remove class ObjectJsonBase in Task to reduce complexity
    
    fix: #7271
---
 .../src/pydolphinscheduler/core/base.py            |  43 +++----
 .../pydolphinscheduler/core/process_definition.py  |   6 +-
 .../src/pydolphinscheduler/core/task.py            | 132 +++++++++------------
 .../src/pydolphinscheduler/tasks/http.py           |  47 +++-----
 .../src/pydolphinscheduler/tasks/python.py         |  33 +++---
 .../src/pydolphinscheduler/tasks/shell.py          |  18 ++-
 .../src/pydolphinscheduler/tasks/sql.py            |  78 ++++++------
 .../src/pydolphinscheduler/tasks/sub_process.py    |  55 +++------
 .../tests/core/test_process_definition.py          |  14 +--
 .../pydolphinscheduler/tests/core/test_task.py     |  76 ++++++++----
 .../pydolphinscheduler/tests/tasks/test_http.py    |  55 +++++++--
 .../pydolphinscheduler/tests/tasks/test_python.py  |  60 +++++-----
 .../pydolphinscheduler/tests/tasks/test_shell.py   |  43 ++++---
 .../pydolphinscheduler/tests/tasks/test_sql.py     |  42 ++++++-
 .../tests/tasks/test_sub_process.py                |  53 ++++++---
 15 files changed, 405 insertions(+), 350 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
index d636d51..690351a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
@@ -26,11 +26,14 @@ from pydolphinscheduler.utils.string import attr2camel
 class Base:
     """DolphinScheduler Base object."""
 
+    # Object key attribute, to test whether object equals and so on.
     _KEY_ATTR: set = {"name", "description"}
 
-    _TO_DICT_ATTR: set = set()
+    # Object defines attribute, use when needs to communicate with Java gateway server.
+    _DEFINE_ATTR: set = set()
 
-    DEFAULT_ATTR: Dict = {}
+    # Object default attribute, will add those attribute to `_DEFINE_ATTR` when init assign missing.
+    _DEFAULT_ATTR: Dict = {}
 
     def __init__(self, name: str, description: Optional[str] = None):
         self.name = name
@@ -44,28 +47,28 @@ class Base:
             getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR
         )
 
-    # TODO check how Redash do
-    # TODO DRY
-    def to_dict(self, camel_attr=True) -> Dict:
-        """Get object key attribute dict.
-
-        use attribute `self._TO_DICT_ATTR` to determine which attributes should including to
-        children `to_dict` function.
-        """
-        # content = {}
-        # for attr, value in self.__dict__.items():
-        #     # Don't publish private variables
-        #     if attr.startswith("_"):
-        #         continue
-        #     else:
-        #         content[snake2camel(attr)] = value
-        # content.update(self.DEFAULT_ATTR)
-        # return content
+    def get_define_custom(
+        self, camel_attr: bool = True, custom_attr: set = None
+    ) -> Dict:
+        """Get object definition attribute by given attr set."""
         content = {}
-        for attr in self._TO_DICT_ATTR:
+        for attr in custom_attr:
             val = getattr(self, attr, None)
             if camel_attr:
                 content[attr2camel(attr)] = val
             else:
                 content[attr] = val
         return content
+
+    def get_define(self, camel_attr: bool = True) -> Dict:
+        """Get object definition attribute communicate to Java gateway server.
+
+        use attribute `self._DEFINE_ATTR` to determine which attributes should including when
+        object tries to communicate with Java gateway server.
+        """
+        content = self.get_define_custom(camel_attr, self._DEFINE_ATTR)
+        update_default = {
+            k: self._DEFAULT_ATTR.get(k) for k in self._DEFAULT_ATTR if k not in content
+        }
+        content.update(update_default)
+        return content
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 1586757..70d4e6b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -68,7 +68,7 @@ class ProcessDefinition(Base):
         "param",
     }
 
-    _TO_DICT_ATTR = {
+    _DEFINE_ATTR = {
         "name",
         "description",
         "_project",
@@ -195,7 +195,7 @@ class ProcessDefinition(Base):
         if not self.tasks:
             return [self.tasks]
         else:
-            return [task.to_dict() for task in self.tasks.values()]
+            return [task.get_define() for task in self.tasks.values()]
 
     @property
     def task_relation_json(self) -> List[Dict]:
@@ -204,7 +204,7 @@ class ProcessDefinition(Base):
             return [self.tasks]
         else:
             self._handle_root_relation()
-            return [tr.to_dict() for tr in self._task_relations]
+            return [tr.get_define() for tr in self._task_relations]
 
     @property
     def schedule_json(self) -> Optional[Dict]:
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index d22e5c8..39cca9c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""DolphinScheduler ObjectJsonBase, TaskParams and Task object."""
+"""DolphinScheduler Task and TaskRelation object."""
 
 import logging
 from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@@ -32,61 +32,17 @@ from pydolphinscheduler.core.process_definition import (
     ProcessDefinitionContext,
 )
 from pydolphinscheduler.java_gateway import launch_gateway
-from pydolphinscheduler.utils.string import class_name2camel, snake2camel
 
 
-class ObjectJsonBase:
-    """Task base class, define `__str__` and `to_dict` function would be use in other task related class."""
-
-    DEFAULT_ATTR = {}
-
-    def __int__(self, *args, **kwargs):
-        pass
-
-    def __str__(self) -> str:
-        content = []
-        for attribute, value in self.__dict__.items():
-            content.append(f'"{snake2camel(attribute)}": {value}')
-        content = ",".join(content)
-        return f'"{class_name2camel(type(self).__name__)}":{{{content}}}'
-
-    # TODO check how Redash do
-    # TODO DRY
-    def to_dict(self) -> Dict:
-        """Get object key attribute dict which determine by attribute `DEFAULT_ATTR`."""
-        content = {snake2camel(attr): value for attr, value in self.__dict__.items()}
-        content.update(self.DEFAULT_ATTR)
-        return content
-
-
-class TaskParams(ObjectJsonBase):
-    """TaskParams object, describe the key parameter of a single task."""
-
-    DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
-
-    def __init__(
-        self,
-        local_params: Optional[List] = None,
-        resource_list: Optional[List] = None,
-        dependence: Optional[Dict] = None,
-        wait_start_timeout: Optional[Dict] = None,
-        condition_result: Optional[Dict] = None,
-        *args,
-        **kwargs,
-    ):
-        super().__init__(*args, **kwargs)
-        self.local_params = local_params or []
-        self.resource_list = resource_list or []
-        self.dependence = dependence or {}
-        self.wait_start_timeout = wait_start_timeout or {}
-        # TODO need better way to handle it, this code just for POC
-        self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
-
-
-class TaskRelation(ObjectJsonBase):
+class TaskRelation(Base):
     """TaskRelation object, describe the relation of exactly two tasks."""
 
-    DEFAULT_ATTR = {
+    _DEFINE_ATTR = {
+        "pre_task_code",
+        "post_task_code",
+    }
+
+    _DEFAULT_ATTR = {
         "name": "",
         "preTaskVersion": 1,
         "postTaskVersion": 1,
@@ -98,8 +54,9 @@ class TaskRelation(ObjectJsonBase):
         self,
         pre_task_code: int,
         post_task_code: int,
+        name: Optional[str] = None,
     ):
-        super().__init__()
+        super().__init__(name)
         self.pre_task_code = pre_task_code
         self.post_task_code = post_task_code
 
@@ -110,19 +67,32 @@ class TaskRelation(ObjectJsonBase):
 class Task(Base):
     """Task object, parent class for all exactly task type."""
 
-    DEFAULT_DEPS_ATTR = {
-        "name": "",
-        "preTaskVersion": 1,
-        "postTaskVersion": 1,
-        "conditionType": 0,
-        "conditionParams": {},
+    _DEFINE_ATTR = {
+        "name",
+        "code",
+        "version",
+        "task_type",
+        "task_params",
+        "description",
+        "flag",
+        "task_priority",
+        "worker_group",
+        "delay_time",
+        "fail_retry_times",
+        "fail_retry_interval",
+        "timeout_flag",
+        "timeout_notify_strategy",
+        "timeout",
     }
 
+    _task_custom_attr: set = set()
+
+    DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
+
     def __init__(
         self,
         name: str,
         task_type: str,
-        task_params: TaskParams,
         description: Optional[str] = None,
         flag: Optional[str] = TaskFlag.YES,
         task_priority: Optional[str] = TaskPriority.MEDIUM,
@@ -134,11 +104,15 @@ class Task(Base):
         timeout_notify_strategy: Optional = None,
         timeout: Optional[int] = 0,
         process_definition: Optional[ProcessDefinition] = None,
+        local_params: Optional[List] = None,
+        resource_list: Optional[List] = None,
+        dependence: Optional[Dict] = None,
+        wait_start_timeout: Optional[Dict] = None,
+        condition_result: Optional[Dict] = None,
     ):
 
         super().__init__(name, description)
         self.task_type = task_type
-        self.task_params = task_params
         self.flag = flag
         self.task_priority = task_priority
         self.worker_group = worker_group
@@ -169,6 +143,13 @@ class Task(Base):
                 self.code,
             )
 
+        # Attribute for task param
+        self.local_params = local_params or []
+        self.resource_list = resource_list or []
+        self.dependence = dependence or {}
+        self.wait_start_timeout = wait_start_timeout or {}
+        self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
+
     @property
     def process_definition(self) -> Optional[ProcessDefinition]:
         """Get attribute process_definition."""
@@ -179,6 +160,22 @@ class Task(Base):
         """Set attribute process_definition."""
         self._process_definition = process_definition
 
+    @property
+    def task_params(self) -> Optional[Dict]:
+        """Get task parameter object.
+
+        Will get result to combine _task_custom_attr and custom_attr.
+        """
+        custom_attr = {
+            "local_params",
+            "resource_list",
+            "dependence",
+            "wait_start_timeout",
+            "condition_result",
+        }
+        custom_attr |= self._task_custom_attr
+        return self.get_define_custom(custom_attr=custom_attr)
+
     def __hash__(self):
         return hash(self.code)
 
@@ -259,16 +256,3 @@ class Task(Base):
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
         # gateway_result_checker(result)
         return result.get("code"), result.get("version")
-
-    def to_dict(self, camel_attr=True) -> Dict:
-        """Task `to_dict` function which will return key attribute for Task object."""
-        content = {}
-        for attr, value in self.__dict__.items():
-            # Don't publish private variables
-            if attr.startswith("_"):
-                continue
-            elif isinstance(value, TaskParams):
-                content[snake2camel(attr)] = value.to_dict()
-            else:
-                content[snake2camel(attr)] = value
-        return content
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py
index 445142e..781333d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py
@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
 
 
@@ -50,11 +50,22 @@ class HttpCheckCondition:
     BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS"
 
 
-class HttpTaskParams(TaskParams):
-    """Parameter only for Http task types."""
+class Http(Task):
+    """Task HTTP object, declare behavior for HTTP task to dolphinscheduler."""
+
+    _task_custom_attr = {
+        "url",
+        "http_method",
+        "http_params",
+        "http_check_condition",
+        "condition",
+        "connect_timeout",
+        "socket_timeout",
+    }
 
     def __init__(
         self,
+        name: str,
         url: str,
         http_method: Optional[str] = HttpMethod.GET,
         http_params: Optional[str] = None,
@@ -65,7 +76,7 @@ class HttpTaskParams(TaskParams):
         *args,
         **kwargs
     ):
-        super().__init__(*args, **kwargs)
+        super().__init__(name, TaskType.HTTP, *args, **kwargs)
         self.url = url
         if not hasattr(HttpMethod, http_method):
             raise PyDSParamException(
@@ -88,31 +99,3 @@ class HttpTaskParams(TaskParams):
         self.condition = condition
         self.connect_timeout = connect_timeout
         self.socket_timeout = socket_timeout
-
-
-class Http(Task):
-    """Task HTTP object, declare behavior for HTTP task to dolphinscheduler."""
-
-    def __init__(
-        self,
-        name: str,
-        url: str,
-        http_method: Optional[str] = HttpMethod.GET,
-        http_params: Optional[str] = None,
-        http_check_condition: Optional[str] = HttpCheckCondition.STATUS_CODE_DEFAULT,
-        condition: Optional[str] = None,
-        connect_timeout: Optional[int] = 60000,
-        socket_timeout: Optional[int] = 60000,
-        *args,
-        **kwargs
-    ):
-        task_params = HttpTaskParams(
-            url=url,
-            http_method=http_method,
-            http_params=http_params,
-            http_check_condition=http_check_condition,
-            condition=condition,
-            connect_timeout=connect_timeout,
-            socket_timeout=socket_timeout,
-        )
-        super().__init__(name, TaskType.HTTP, task_params, *args, **kwargs)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
index 9a7149a..7950480 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
@@ -22,29 +22,30 @@ import types
 from typing import Any
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
 
 
-class PythonTaskParams(TaskParams):
-    """Parameter only for Python task types."""
-
-    def __init__(self, raw_script: str, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-        self.raw_script = raw_script
-
-
 class Python(Task):
     """Task Python object, declare behavior for Python task to dolphinscheduler."""
 
+    _task_custom_attr = {
+        "raw_script",
+    }
+
     def __init__(self, name: str, code: Any, *args, **kwargs):
-        if isinstance(code, str):
-            task_params = PythonTaskParams(raw_script=code)
-        elif isinstance(code, types.FunctionType):
-            py_function = inspect.getsource(code)
-            task_params = PythonTaskParams(raw_script=py_function)
+        super().__init__(name, TaskType.PYTHON, *args, **kwargs)
+        self._code = code
+
+    @property
+    def raw_script(self) -> str:
+        """Get python task define attribute `raw_script`."""
+        if isinstance(self._code, str):
+            return self._code
+        elif isinstance(self._code, types.FunctionType):
+            py_function = inspect.getsource(self._code)
+            return py_function
         else:
             raise PyDSParamException(
-                "Parameter code do not support % for now.", type(code)
+                "Parameter code do not support % for now.", type(self._code)
             )
-        super().__init__(name, TaskType.PYTHON, task_params, *args, **kwargs)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
index 25e82f5..94ad2f4 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
@@ -18,15 +18,7 @@
 """Task shell."""
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task, TaskParams
-
-
-class ShellTaskParams(TaskParams):
-    """Parameter only for shell task types."""
-
-    def __init__(self, raw_script: str, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-        self.raw_script = raw_script
+from pydolphinscheduler.core.task import Task
 
 
 class Shell(Task):
@@ -37,6 +29,10 @@ class Shell(Task):
     task.name assign to `task_shell`
     """
 
+    _task_custom_attr = {
+        "raw_script",
+    }
+
     def __init__(self, name: str, command: str, *args, **kwargs):
-        task_params = ShellTaskParams(raw_script=command)
-        super().__init__(name, TaskType.SHELL, task_params, *args, **kwargs)
+        super().__init__(name, TaskType.SHELL, *args, **kwargs)
+        self.raw_script = command
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
index 62da964..f16eb10 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
@@ -21,7 +21,7 @@ import re
 from typing import Dict, Optional
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.java_gateway import launch_gateway
 
 
@@ -32,31 +32,6 @@ class SqlType:
     NOT_SELECT = 1
 
 
-class SqlTaskParams(TaskParams):
-    """Parameter only for Sql task type."""
-
-    def __init__(
-        self,
-        type: str,
-        datasource: str,
-        sql: str,
-        sql_type: Optional[int] = SqlType.NOT_SELECT,
-        display_rows: Optional[int] = 10,
-        pre_statements: Optional[str] = None,
-        post_statements: Optional[str] = None,
-        *args,
-        **kwargs
-    ):
-        super().__init__(*args, **kwargs)
-        self.type = type
-        self.datasource = datasource
-        self.sql = sql
-        self.sql_type = sql_type
-        self.display_rows = display_rows
-        self.pre_statements = pre_statements or []
-        self.post_statements = post_statements or []
-
-
 class Sql(Task):
     """Task SQL object, declare behavior for SQL task to dolphinscheduler.
 
@@ -73,38 +48,40 @@ class Sql(Task):
     database type and database instance would run this sql.
     """
 
+    _task_custom_attr = {
+        "sql",
+        "sql_type",
+        "pre_statements",
+        "post_statements",
+        "display_rows",
+    }
+
     def __init__(
         self,
         name: str,
         datasource_name: str,
         sql: str,
-        pre_sql: Optional[str] = None,
-        post_sql: Optional[str] = None,
+        pre_statements: Optional[str] = None,
+        post_statements: Optional[str] = None,
         display_rows: Optional[int] = 10,
         *args,
         **kwargs
     ):
-        self._sql = sql
-        self._datasource_name = datasource_name
+        super().__init__(name, TaskType.SQL, *args, **kwargs)
+        self.datasource_name = datasource_name
+        self.sql = sql
+        self.pre_statements = pre_statements or []
+        self.post_statements = post_statements or []
+        self.display_rows = display_rows
         self._datasource = {}
-        task_params = SqlTaskParams(
-            type=self.get_datasource_type(),
-            datasource=self.get_datasource_id(),
-            sql=sql,
-            sql_type=self.sql_type,
-            display_rows=display_rows,
-            pre_statements=pre_sql,
-            post_statements=post_sql,
-        )
-        super().__init__(name, TaskType.SQL, task_params, *args, **kwargs)
 
     def get_datasource_type(self) -> str:
         """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
-        return self.get_datasource_info(self._datasource_name).get("type")
+        return self.get_datasource_info(self.datasource_name).get("type")
 
     def get_datasource_id(self) -> str:
         """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
-        return self.get_datasource_info(self._datasource_name).get("id")
+        return self.get_datasource_info(self.datasource_name).get("id")
 
     def get_datasource_info(self, name) -> Dict:
         """Get datasource info from java gateway, contains datasource id, type, name."""
@@ -122,7 +99,22 @@ class Sql(Task):
             "^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
         )
         pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
-        if pattern_select.match(self._sql) is None:
+        if pattern_select.match(self.sql) is None:
             return SqlType.NOT_SELECT
         else:
             return SqlType.SELECT
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+        """Override Task.task_params for sql task.
+
+        Sql task have some specials attribute for task_params, and is odd if we
+        directly set as python property, so we Override Task.task_params here.
+        """
+        params = super().task_params
+        custom_params = {
+            "type": self.get_datasource_type(),
+            "datasource": self.get_datasource_id(),
+        }
+        params.update(custom_params)
+        return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
index 1bf0bd1..8ba6b4c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
@@ -20,57 +20,36 @@
 from typing import Dict
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.process_definition import ProcessDefinitionContext
-from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException
 from pydolphinscheduler.java_gateway import launch_gateway
 
 
-class SubProcessTaskParams(TaskParams):
-    """Parameter only for Sub Process task type."""
-
-    def __init__(self, process_definition_code, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-        self.process_definition_code = process_definition_code
-
-
 class SubProcess(Task):
     """Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler."""
 
-    def __init__(self, name: str, process_definition_name: str, *args, **kwargs):
-        self._process_definition_name = process_definition_name
-        self._process_definition_info = {}
-        # TODO: Optimize the way of obtaining process_definition
-        self.process_definition = kwargs.get(
-            "process_definition", ProcessDefinitionContext.get()
-        )
-        if not self.process_definition:
-            raise PyDSProcessDefinitionNotAssignException(
-                "ProcessDefinition must be provider when SubProcess initialization."
-            )
+    _task_custom_attr = {"process_definition_code"}
 
-        task_params = SubProcessTaskParams(
-            process_definition_code=self.get_process_definition_code(),
-        )
-        super().__init__(name, TaskType.SUB_PROCESS, task_params, *args, **kwargs)
+    def __init__(self, name: str, process_definition_name: str, *args, **kwargs):
+        super().__init__(name, TaskType.SUB_PROCESS, *args, **kwargs)
+        self.process_definition_name = process_definition_name
 
-    def get_process_definition_code(self) -> str:
+    @property
+    def process_definition_code(self) -> str:
         """Get process definition code, a wrapper for :func:`get_process_definition_info`."""
-        return self.get_process_definition_info(self._process_definition_name).get(
+        return self.get_process_definition_info(self.process_definition_name).get(
             "code"
         )
 
     def get_process_definition_info(self, process_definition_name: str) -> Dict:
         """Get process definition info from java gateway, contains process definition id, name, code."""
-        if self._process_definition_info:
-            return self._process_definition_info
-        else:
-            gateway = launch_gateway()
-            self._process_definition_info = (
-                gateway.entry_point.getProcessDefinitionInfo(
-                    self.process_definition.user.name,
-                    self.process_definition.project.name,
-                    process_definition_name,
-                )
+        if not self.process_definition:
+            raise PyDSProcessDefinitionNotAssignException(
+                "ProcessDefinition must be provider for task SubProcess."
             )
-            return self._process_definition_info
+        gateway = launch_gateway()
+        return gateway.entry_point.getProcessDefinitionInfo(
+            self.process_definition.user.name,
+            self.process_definition.project.name,
+            process_definition_name,
+        )
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index 4c15974..8491878 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -28,7 +28,6 @@ from pydolphinscheduler.constants import (
     ProcessDefinitionReleaseState,
 )
 from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import TaskParams
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.side import Project, Tenant, User
 from pydolphinscheduler.utils.date import conv_to_schedule
@@ -152,8 +151,8 @@ def test__parse_datetime_not_support_type(val: Any):
             pd._parse_datetime(val)
 
 
-def test_process_definition_to_dict_without_task():
-    """Test process definition function to_dict without task."""
+def test_process_definition_get_define_without_task():
+    """Test process definition function get_define without task."""
     expect = {
         "name": TEST_PROCESS_DEFINITION_NAME,
         "description": None,
@@ -168,7 +167,7 @@ def test_process_definition_to_dict_without_task():
         "taskRelationJson": [{}],
     }
     with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
-        assert pd.to_dict() == expect
+        assert pd.get_define() == expect
 
 
 def test_process_definition_simple_context_manager():
@@ -176,10 +175,7 @@ def test_process_definition_simple_context_manager():
     expect_tasks_num = 5
     with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
         for i in range(expect_tasks_num):
-            task_params = TaskParams()
-            curr_task = Task(
-                name=f"task-{i}", task_type=f"type-{i}", task_params=task_params
-            )
+            curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
             # Set deps task i as i-1 parent
             if i > 0:
                 pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
@@ -221,11 +217,9 @@ def test_process_definition_simple_separate():
     expect_tasks_num = 5
     pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
     for i in range(expect_tasks_num):
-        task_params = TaskParams()
         curr_task = Task(
             name=f"task-{i}",
             task_type=f"type-{i}",
-            task_params=task_params,
             process_definition=pd,
         )
         # Set deps task i as i-1 parent
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 6d09820..b551f07 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -21,21 +21,49 @@ from unittest.mock import patch
 
 import pytest
 
-from pydolphinscheduler.core.task import Task, TaskParams, TaskRelation
+from pydolphinscheduler.core.task import Task, TaskRelation
 from tests.testing.task import Task as testTask
 
 
-def test_task_params_to_dict():
-    """Test TaskParams object function to_dict."""
-    expect = {
-        "resourceList": [],
-        "localParams": [],
-        "dependence": {},
-        "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT,
-        "waitStartTimeout": {},
-    }
-    task_param = TaskParams()
-    assert task_param.to_dict() == expect
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            dict(),
+            {
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        ),
+        (
+            {
+                "local_params": ["foo", "bar"],
+                "resource_list": ["foo", "bar"],
+                "dependence": {"foo", "bar"},
+                "wait_start_timeout": {"foo", "bar"},
+                "condition_result": {"foo": ["bar"]},
+            },
+            {
+                "localParams": ["foo", "bar"],
+                "resourceList": ["foo", "bar"],
+                "dependence": {"foo", "bar"},
+                "waitStartTimeout": {"foo", "bar"},
+                "conditionResult": {"foo": ["bar"]},
+            },
+        ),
+    ],
+)
+def test_property_task_params(attr, expect):
+    """Test class task property."""
+    task = testTask(
+        "test-property-task-params",
+        "test-task",
+        **attr,
+    )
+    assert expect == task.task_params
 
 
 def test_task_relation_to_dict():
@@ -54,15 +82,15 @@ def test_task_relation_to_dict():
     task_param = TaskRelation(
         pre_task_code=pre_task_code, post_task_code=post_task_code
     )
-    assert task_param.to_dict() == expect
+    assert task_param.get_define() == expect
 
 
-def test_task_to_dict():
-    """Test Task object function to_dict."""
+def test_task_get_define():
+    """Test Task object function get_define."""
     code = 123
     version = 1
-    name = "test_task_to_dict"
-    task_type = "test_task_to_dict_type"
+    name = "test_task_get_define"
+    task_type = "test_task_get_define_type"
     expect = {
         "code": code,
         "name": name,
@@ -90,8 +118,8 @@ def test_task_to_dict():
         "pydolphinscheduler.core.task.Task.gen_code_and_version",
         return_value=(code, version),
     ):
-        task = Task(name=name, task_type=task_type, task_params=TaskParams())
-        assert task.to_dict() == expect
+        task = Task(name=name, task_type=task_type)
+        assert task.get_define() == expect
 
 
 @pytest.mark.parametrize("shift", ["<<", ">>"])
@@ -100,8 +128,8 @@ def test_two_tasks_shift(shift: str):
 
     Here we test both `>>` and `<<` bit operator.
     """
-    upstream = testTask(name="upstream", task_type=shift, task_params=TaskParams())
-    downstream = testTask(name="downstream", task_type=shift, task_params=TaskParams())
+    upstream = testTask(name="upstream", task_type=shift)
+    downstream = testTask(name="downstream", task_type=shift)
     if shift == "<<":
         downstream << upstream
     elif shift == ">>":
@@ -137,10 +165,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
         "downstream": "upstream",
     }
     task_type = "dep_task_and_tasks"
-    task = testTask(name="upstream", task_type=task_type, task_params=TaskParams())
+    task = testTask(name="upstream", task_type=task_type)
     tasks = [
-        testTask(name="downstream1", task_type=task_type, task_params=TaskParams()),
-        testTask(name="downstream2", task_type=task_type, task_params=TaskParams()),
+        testTask(name="downstream1", task_type=task_type),
+        testTask(name="downstream2", task_type=task_type),
     ]
 
     # Use build-in function eval to simply test case and reduce duplicate code
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
index 7c01517..060cdec 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
@@ -22,12 +22,7 @@ from unittest.mock import patch
 import pytest
 
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.tasks.http import (
-    Http,
-    HttpCheckCondition,
-    HttpMethod,
-    HttpTaskParams,
-)
+from pydolphinscheduler.tasks.http import Http, HttpCheckCondition, HttpMethod
 
 
 @pytest.mark.parametrize(
@@ -51,6 +46,38 @@ def test_attr_exists(class_name, attrs):
 
 
 @pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {"url": "https://www.apache.org"},
+            {
+                "url": "https://www.apache.org",
+                "httpMethod": "GET",
+                "httpParams": [],
+                "httpCheckCondition": "STATUS_CODE_DEFAULT",
+                "condition": None,
+                "connectTimeout": 60000,
+                "socketTimeout": 60000,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, attr, expect):
+    """Test task http property."""
+    task = Http("test-http-task-params", **attr)
+    assert expect == task.task_params
+
+
+@pytest.mark.parametrize(
     "param",
     [
         {"http_method": "http_method"},
@@ -62,18 +89,22 @@ def test_attr_exists(class_name, attrs):
         },
     ],
 )
-def test_http_task_param_not_support_param(param):
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_http_task_param_not_support_param(mock_code, param):
     """Test HttpTaskParams not support parameter."""
     url = "https://www.apache.org"
     with pytest.raises(PyDSParamException, match="Parameter .*?"):
-        HttpTaskParams(url, **param)
+        Http("test-no-supprot-param", url, **param)
 
 
-def test_http_to_dict():
-    """Test task HTTP function to_dict."""
+def test_http_get_define():
+    """Test task HTTP function get_define."""
     code = 123
     version = 1
-    name = "test_http_to_dict"
+    name = "test_http_get_define"
     url = "https://www.apache.org"
     expect = {
         "code": code,
@@ -110,4 +141,4 @@ def test_http_to_dict():
         return_value=(code, version),
     ):
         http = Http(name, url)
-        assert http.to_dict() == expect
+        assert http.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
index f9e7f04..dbcd298 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
@@ -23,26 +23,33 @@ from unittest.mock import patch
 import pytest
 
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.tasks.python import Python, PythonTaskParams
+from pydolphinscheduler.tasks.python import Python
 
 
 @pytest.mark.parametrize(
-    "name, value",
+    "attr, expect",
     [
-        ("local_params", "local_params"),
-        ("resource_list", "resource_list"),
-        ("dependence", "dependence"),
-        ("wait_start_timeout", "wait_start_timeout"),
-        ("condition_result", "condition_result"),
+        (
+            {"code": "print(1)"},
+            {
+                "rawScript": "print(1)",
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
     ],
 )
-def test_python_task_params_attr_setter(name, value):
-    """Test python task parameters."""
-    command = 'print("hello world.")'
-    python_task_params = PythonTaskParams(command)
-    assert command == python_task_params.raw_script
-    setattr(python_task_params, name, value)
-    assert value == getattr(python_task_params, name)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, attr, expect):
+    """Test task python property."""
+    task = Python("test-python-task-params", **attr)
+    assert expect == task.task_params
 
 
 @pytest.mark.parametrize(
@@ -52,19 +59,16 @@ def test_python_task_params_attr_setter(name, value):
         ("print", "hello world"),
     ],
 )
-def test_python_task_not_support_code(script_code):
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_python_task_not_support_code(mock_code, script_code):
     """Test python task parameters."""
     name = "not_support_code_type"
-    code = 123
-    version = 1
-    with patch(
-        "pydolphinscheduler.core.task.Task.gen_code_and_version",
-        return_value=(code, version),
-    ):
-        with pytest.raises(
-            PyDSParamException, match="Parameter code do not support .*?"
-        ):
-            Python(name, script_code)
+    with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"):
+        task = Python(name, script_code)
+        task.raw_script
 
 
 def foo():  # noqa: D103
@@ -82,8 +86,8 @@ def foo():  # noqa: D103
         ),
     ],
 )
-def test_python_to_dict(name, script_code, raw):
-    """Test task python function to_dict."""
+def test_python_get_define(name, script_code, raw):
+    """Test task python function get_define."""
     code = 123
     version = 1
     expect = {
@@ -115,4 +119,4 @@ def test_python_to_dict(name, script_code, raw):
         return_value=(code, version),
     ):
         shell = Python(name, script_code)
-        assert shell.to_dict() == expect
+        assert shell.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
index 56fae1c..e42f6dc 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
@@ -22,33 +22,40 @@ from unittest.mock import patch
 
 import pytest
 
-from pydolphinscheduler.tasks.shell import Shell, ShellTaskParams
+from pydolphinscheduler.tasks.shell import Shell
 
 
 @pytest.mark.parametrize(
-    "name, value",
+    "attr, expect",
     [
-        ("local_params", "local_params"),
-        ("resource_list", "resource_list"),
-        ("dependence", "dependence"),
-        ("wait_start_timeout", "wait_start_timeout"),
-        ("condition_result", "condition_result"),
+        (
+            {"command": "test script"},
+            {
+                "rawScript": "test script",
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
     ],
 )
-def test_shell_task_params_attr_setter(name, value):
-    """Test shell task parameters."""
-    raw_script = "echo shell task parameter"
-    shell_task_params = ShellTaskParams(raw_script)
-    assert raw_script == shell_task_params.raw_script
-    setattr(shell_task_params, name, value)
-    assert value == getattr(shell_task_params, name)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, attr, expect):
+    """Test task shell task property."""
+    task = Shell("test-shell-task-params", **attr)
+    assert expect == task.task_params
 
 
-def test_shell_to_dict():
-    """Test task shell function to_dict."""
+def test_shell_get_define():
+    """Test task shell function get_define."""
     code = 123
     version = 1
-    name = "test_shell_to_dict"
+    name = "test_shell_get_define"
     command = "echo test shell"
     expect = {
         "code": code,
@@ -79,4 +86,4 @@ def test_shell_to_dict():
         return_value=(code, version),
     ):
         shell = Shell(name, command)
-        assert shell.to_dict() == expect
+        assert shell.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
index 499b46b..2590100 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
@@ -82,12 +82,48 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
     ), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"
 
 
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {"datasource_name": "datasource_name", "sql": "select 1"},
+            {
+                "sql": "select 1",
+                "type": "MYSQL",
+                "datasource": 1,
+                "sqlType": SqlType.SELECT,
+                "preStatements": [],
+                "postStatements": [],
+                "displayRows": 10,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+    return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
+    """Test task sql task property."""
+    task = Sql("test-sql-task-params", **attr)
+    assert expect == task.task_params
+
+
 @patch(
     "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
     return_value=({"id": 1, "type": "MYSQL"}),
 )
-def test_sql_to_dict(mock_datasource):
-    """Test task sql function to_dict."""
+def test_sql_get_define(mock_datasource):
+    """Test task sql function get_define."""
     code = 123
     version = 1
     name = "test_sql_dict"
@@ -128,4 +164,4 @@ def test_sql_to_dict(mock_datasource):
         return_value=(code, version),
     ):
         task = Sql(name, datasource_name, command)
-        assert task.to_dict() == expect
+        assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
index 4a5388a..7f471a1 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
@@ -23,7 +23,7 @@ from unittest.mock import patch
 import pytest
 
 from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.tasks.sub_process import SubProcess, SubProcessTaskParams
+from pydolphinscheduler.tasks.sub_process import SubProcess
 
 TEST_SUB_PROCESS_DEFINITION_NAME = "sub-test-process-definition"
 TEST_SUB_PROCESS_DEFINITION_CODE = "3643589832320"
@@ -31,22 +31,39 @@ TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
 
 
 @pytest.mark.parametrize(
-    "name, value",
+    "attr, expect",
     [
-        ("local_params", "local_params"),
-        ("resource_list", "resource_list"),
-        ("dependence", "dependence"),
-        ("wait_start_timeout", "wait_start_timeout"),
-        ("condition_result", "condition_result"),
+        (
+            {"process_definition_name": TEST_SUB_PROCESS_DEFINITION_NAME},
+            {
+                "processDefinitionCode": TEST_SUB_PROCESS_DEFINITION_CODE,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
     ],
 )
-def test_sub_process_task_params_attr_setter(name, value):
-    """Test sub_process task parameters."""
-    process_definition_code = "3643589832320"
-    sub_process_task_params = SubProcessTaskParams(process_definition_code)
-    assert process_definition_code == sub_process_task_params.process_definition_code
-    setattr(sub_process_task_params, name, value)
-    assert value == getattr(sub_process_task_params, name)
+@patch(
+    "pydolphinscheduler.tasks.sub_process.SubProcess.get_process_definition_info",
+    return_value=(
+        {
+            "id": 1,
+            "name": TEST_SUB_PROCESS_DEFINITION_NAME,
+            "code": TEST_SUB_PROCESS_DEFINITION_CODE,
+        }
+    ),
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, mock_pd_info, attr, expect):
+    """Test task sub process property."""
+    task = SubProcess("test-sub-process-task-params", **attr)
+    assert expect == task.task_params
 
 
 @patch(
@@ -59,11 +76,11 @@ def test_sub_process_task_params_attr_setter(name, value):
         }
     ),
 )
-def test_sub_process_to_dict(mock_process_definition):
-    """Test task sub_process function to_dict."""
+def test_sub_process_get_define(mock_process_definition):
+    """Test task sub_process function get_define."""
     code = 123
     version = 1
-    name = "test_sub_process_to_dict"
+    name = "test_sub_process_get_define"
     expect = {
         "code": code,
         "name": name,
@@ -94,4 +111,4 @@ def test_sub_process_to_dict(mock_process_definition):
     ):
         with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME):
             sub_process = SubProcess(name, TEST_SUB_PROCESS_DEFINITION_NAME)
-            assert sub_process.to_dict() == expect
+            assert sub_process.get_define() == expect