You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/12/15 13:52:00 UTC

[GitHub] [dolphinscheduler] devosend commented on a change in pull request #7405: [python] Add task dependent

devosend commented on a change in pull request #7405:
URL: https://github.com/apache/dolphinscheduler/pull/7405#discussion_r768735419



##########
File path: dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
##########
@@ -0,0 +1,276 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task dependent."""
+
+from typing import Dict, Optional, Tuple
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.base import Base
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSJavaGatewayException, PyDSParamException
+from pydolphinscheduler.java_gateway import launch_gateway
+
+DEPENDENT_ALL_TASK_IN_WORKFLOW = "0"
+
+
+class DependentDate(str):
+    """Constant of Dependent date value.
+
+    These values set according to Java server side, if you want to add and change it,
+    please change Java server side first.
+    """
+
+    # TODO Maybe we should add parent level to DependentDate for easy to use, such as
+    # DependentDate.MONTH.THIS_MONTH
+
+    # Hour
+    CURRENT_HOUR = "currentHour"
+    LAST_ONE_HOUR = "last1Hour"
+    LAST_TWO_HOURS = "last2Hours"
+    LAST_THREE_HOURS = "last3Hours"
+    LAST_TWENTY_FOUR_HOURS = "last24Hours"
+
+    # Day
+    TODAY = "today"
+    LAST_ONE_DAYS = "last1Days"
+    LAST_TWO_DAYS = "last2Days"
+    LAST_THREE_DAYS = "last3Days"
+    LAST_SEVEN_DAYS = "last7Days"
+
+    # Week
+    THIS_WEEK = "thisWeek"
+    LAST_WEEK = "lastWeek"
+    LAST_MONDAY = "lastMonday"
+    LAST_TUESDAY = "lastTuesday"
+    LAST_WEDNESDAY = "lastWednesday"
+    LAST_THURSDAY = "lastThursday"
+    LAST_FRIDAY = "lastFriday"
+    LAST_SATURDAY = "lastSaturday"
+    LAST_SUNDAY = "lastSunday"
+
+    # Month
+    THIS_MONTH = "thisMonth"
+    LAST_MONTH = "lastMonth"
+    LAST_MONTH_BEGIN = "lastMonthBegin"
+    LAST_MONTH_END = "lastMonthEnd"
+
+
+class DependentItem(Base):
+    """Dependent item object, minimal unit for task dependent.
+
+    It declare which project, process_definition, task are dependent to this task.
+    """
+
+    _DEFINE_ATTR = {
+        "project_code",
+        "definition_code",
+        "dep_task_code",
+        "cycle",
+        "date_value",
+    }
+
+    # TODO maybe we should conside overwrite operator `and` and `or` for DependentItem to
+    #  support more easy way to set relation
+    def __init__(
+        self,
+        project_name: str,
+        process_definition_name: str,
+        dependent_task_name: Optional[str] = DEPENDENT_ALL_TASK_IN_WORKFLOW,
+        dependent_date: Optional[DependentDate] = DependentDate.TODAY,
+    ):
+        obj_name = f"{project_name}.{process_definition_name}.{dependent_task_name}.{dependent_date}"
+        super().__init__(obj_name)
+        self.project_name = project_name
+        self.process_definition_name = process_definition_name
+        self.dependent_task_name = dependent_task_name
+        if dependent_date is None:
+            raise PyDSParamException(
+                "Parameter dependent_date must provider by got None."
+            )
+        else:
+            self.dependent_date = dependent_date
+        self._code = {}
+
+    def __repr__(self) -> str:
+        return "depend_item_list"
+
+    @property
+    def project_code(self) -> str:
+        """Get dependent project code."""
+        return self.get_code_from_gateway().get("projectCode")
+
+    @property
+    def definition_code(self) -> str:
+        """Get dependent definition code."""
+        return self.get_code_from_gateway().get("processDefinitionCode")
+
+    @property
+    def dep_task_code(self) -> str:
+        """Get dependent tasks code list."""
+        if self.is_all_task:
+            return DEPENDENT_ALL_TASK_IN_WORKFLOW
+        else:
+            return self.get_code_from_gateway().get("taskDefinitionCode")
+
+    # TODO Maybe we should get cycle from dependent date class.
+    @property
+    def cycle(self) -> str:
+        """Get dependent cycle."""
+        if "Hour" in self.dependent_date:
+            return "hour"
+        elif self.dependent_date == "today" or "Days" in self.dependent_date:
+            return "day"
+        elif "Month" in self.dependent_date:
+            return "month"
+        else:
+            return "week"
+
+    @property
+    def date_value(self) -> str:
+        """Get dependent date."""
+        return self.dependent_date
+
+    @property
+    def is_all_task(self) -> bool:
+        """Check whether dependent all tasks or not."""
+        return self.dependent_task_name == DEPENDENT_ALL_TASK_IN_WORKFLOW
+
+    @property
+    def code_parameter(self) -> Tuple:
+        """Get name info parameter to query code."""
+        param = (
+            self.project_name,
+            self.process_definition_name,
+            self.dependent_task_name if not self.is_all_task else None,
+        )
+        return param
+
+    def get_code_from_gateway(self) -> Dict:
+        """Get project, definition, task code from given parameter."""
+        if self._code:
+            return self._code
+        else:
+            gateway = launch_gateway()
+            try:
+                return gateway.entry_point.getDependentInfo(*self.code_parameter)

Review comment:
       The return value of `getDependentInfo ` was not assigned to `self.code`. @zhongjiajie 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org