You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/23 16:15:06 UTC

[GitHub] [airflow] potiuk opened a new pull request #10499: Make models/taskinstance.py pylint compatible

potiuk opened a new pull request #10499:
URL: https://github.com/apache/airflow/pull/10499


   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#issuecomment-679014855


   This one is still draft. Turned out that there is another cyclic dependency hidden in this one :).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475916776



##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,23 +1134,16 @@ def _run_raw_task(
             session.merge(self)
         session.commit()
 
-    def _prepare_and_execute_task_with_callbacks(self, context, session, task):
+    def _prepare_and_execute_task_with_callbacks(
+            self,
+            context,
+            task):
         """
         Prepare Task for Execution
         """
-        from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
-        from airflow.sensors.base_sensor_operator import BaseSensorOperator
+        from airflow.models.renderedtifields import RenderedTaskInstanceFields
 
         task_copy = task.prepare_for_execution()
-        # Sensors in `poke` mode can block execution of DAGs when running
-        # with single process executor, thus we change the mode to`reschedule`
-        # to allow parallel task being scheduled and executed
-        if (
-            isinstance(task_copy, BaseSensorOperator) and

Review comment:
       That is an interesting one :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475919005



##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -159,6 +160,16 @@ def _get_next_poke_interval(self, started_at, try_number):
         else:
             return self.poke_interval
 
+    def prepare_for_execution(self) -> BaseOperator:

Review comment:
       ```suggestion
       def prepare_for_execution(self) -> "BaseSensorOperator":
   ```
   Lovely! Now we have this method it's indeed easier to add some custom "pre_execution" logic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475858811



##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,23 +1134,16 @@ def _run_raw_task(
             session.merge(self)
         session.commit()
 
-    def _prepare_and_execute_task_with_callbacks(self, context, session, task):
+    def _prepare_and_execute_task_with_callbacks(
+            self,
+            context,
+            task):
         """
         Prepare Task for Execution
         """
-        from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
-        from airflow.sensors.base_sensor_operator import BaseSensorOperator
+        from airflow.models.renderedtifields import RenderedTaskInstanceFields
 
         task_copy = task.prepare_for_execution()
-        # Sensors in `poke` mode can block execution of DAGs when running
-        # with single process executor, thus we change the mode to`reschedule`
-        # to allow parallel task being scheduled and executed
-        if (
-            isinstance(task_copy, BaseSensorOperator) and

Review comment:
       Hey @turbaszek -> I think that was the root cause for the cyclic import but it should be hopefully fixed now (at least SkipMixin -> TaskInstance -> BaseSensorOperator -> SkipMixin).
   
   The solution was overloading prepare_for_execution method in BaseSensorOperator (seems also the right thing to do) but I would love you to take a look as well.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475456657



##########
File path: airflow/models/taskinstance.py
##########
@@ -1762,19 +1767,18 @@ def filter_for_tis(
         tis: Iterable[Union["TaskInstance", TaskInstanceKey]]
     ) -> Optional[BooleanClauseList]:
         """Returns SQLAlchemy filter to query selected task instances"""
-        TI = TaskInstance
         if not tis:
             return None
         if all(isinstance(t, TaskInstanceKey) for t in tis):
-            filter_for_tis = ([and_(TI.dag_id == tik.dag_id,
-                                    TI.task_id == tik.task_id,
-                                    TI.execution_date == tik.execution_date)
+            filter_for_tis = ([and_(TaskInstance.dag_id == tik.dag_id,
+                                    TaskInstance.task_id == tik.task_id,
+                                    TaskInstance.execution_date == tik.execution_date)
                                for tik in tis])
             return or_(*filter_for_tis)
         if all(isinstance(t, TaskInstance) for t in tis):
-            filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
-                                    TI.task_id == ti.task_id,
-                                    TI.execution_date == ti.execution_date)
+            filter_for_tis = ([and_(TaskInstance.dag_id == ti.dag_id,
+                                    TaskInstance.task_id == ti.task_id,
+                                    TaskInstance.execution_date == ti.execution_date)

Review comment:
       If we decide on long names then we should remove TR from global scope, shouldn't we?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10499: [WIP] Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475856633



##########
File path: airflow/models/taskinstance.py
##########
@@ -1762,19 +1767,18 @@ def filter_for_tis(
         tis: Iterable[Union["TaskInstance", TaskInstanceKey]]
     ) -> Optional[BooleanClauseList]:
         """Returns SQLAlchemy filter to query selected task instances"""
-        TI = TaskInstance
         if not tis:
             return None
         if all(isinstance(t, TaskInstanceKey) for t in tis):
-            filter_for_tis = ([and_(TI.dag_id == tik.dag_id,
-                                    TI.task_id == tik.task_id,
-                                    TI.execution_date == tik.execution_date)
+            filter_for_tis = ([and_(TaskInstance.dag_id == tik.dag_id,
+                                    TaskInstance.task_id == tik.task_id,
+                                    TaskInstance.execution_date == tik.execution_date)
                                for tik in tis])
             return or_(*filter_for_tis)
         if all(isinstance(t, TaskInstance) for t in tis):
-            filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
-                                    TI.task_id == ti.task_id,
-                                    TI.execution_date == ti.execution_date)
+            filter_for_tis = ([and_(TaskInstance.dag_id == ti.dag_id,
+                                    TaskInstance.task_id == ti.task_id,
+                                    TaskInstance.execution_date == ti.execution_date)

Review comment:
       Ahh. I fixed it in a simple way I think :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475456048



##########
File path: airflow/models/taskinstance.py
##########
@@ -442,16 +442,16 @@ def generate_command(dag_id: str,     # pylint: disable=too-many-arguments
     def log_filepath(self):
         """Filepath for TaskInstance"""
         iso = self.execution_date.isoformat()
-        log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
+        the_log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
         return ("{log}/{dag_id}/{task_id}/{iso}.log".format(
-            log=log, dag_id=self.dag_id, task_id=self.task_id, iso=iso))
+            log=the_log, dag_id=self.dag_id, task_id=self.task_id, iso=iso))

Review comment:
       Should we use f-string?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475924558



##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -159,6 +160,16 @@ def _get_next_poke_interval(self, started_at, try_number):
         else:
             return self.poke_interval
 
+    def prepare_for_execution(self) -> BaseOperator:

Review comment:
       Incompatible return value :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475239161



##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1135,11 @@ def _run_raw_task(
             session.merge(self)
         session.commit()
 
-    def _prepare_and_execute_task_with_callbacks(self, context, session, task):
+    def _prepare_and_execute_task_with_callbacks(  # pylint: disable=unused-argument
+            self,
+            context,
+            session,
+            task):

Review comment:
       Should we remove the argument then? What is the reason to keep it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475919392



##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -159,6 +160,16 @@ def _get_next_poke_interval(self, started_at, try_number):
         else:
             return self.poke_interval
 
+    def prepare_for_execution(self) -> BaseOperator:
+        task = super().prepare_for_execution()
+        # Sensors in `poke` mode can block execution of DAGs when running
+        # with single process executor, thus we change the mode to`reschedule`
+        # to allow parallel task being scheduled and executed
+        if conf.get('core', 'executor') == "DebugExecutor":

Review comment:
       ```suggestion
           if conf.get('core', 'executor') == "DebugExecutor":
   ```
   Should we use `DEBUG_EXECUTOR.DebugExecutor`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#issuecomment-679012884


   Is this another attempt to fix cyclic imports? https://github.com/apache/airflow/pull/9674


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475919392



##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -159,6 +160,16 @@ def _get_next_poke_interval(self, started_at, try_number):
         else:
             return self.poke_interval
 
+    def prepare_for_execution(self) -> BaseOperator:
+        task = super().prepare_for_execution()
+        # Sensors in `poke` mode can block execution of DAGs when running
+        # with single process executor, thus we change the mode to`reschedule`
+        # to allow parallel task being scheduled and executed
+        if conf.get('core', 'executor') == "DebugExecutor":

Review comment:
       ```suggestion
           if conf.get('core', 'executor') == "DebugExecutor":
   ```
   Should we use `ExecutorLoader.DEBUG_EXECUTOR`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475457903



##########
File path: airflow/models/taskinstance.py
##########
@@ -1762,19 +1767,18 @@ def filter_for_tis(
         tis: Iterable[Union["TaskInstance", TaskInstanceKey]]
     ) -> Optional[BooleanClauseList]:
         """Returns SQLAlchemy filter to query selected task instances"""
-        TI = TaskInstance
         if not tis:
             return None
         if all(isinstance(t, TaskInstanceKey) for t in tis):
-            filter_for_tis = ([and_(TI.dag_id == tik.dag_id,
-                                    TI.task_id == tik.task_id,
-                                    TI.execution_date == tik.execution_date)
+            filter_for_tis = ([and_(TaskInstance.dag_id == tik.dag_id,
+                                    TaskInstance.task_id == tik.task_id,
+                                    TaskInstance.execution_date == tik.execution_date)
                                for tik in tis])
             return or_(*filter_for_tis)
         if all(isinstance(t, TaskInstance) for t in tis):
-            filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
-                                    TI.task_id == ti.task_id,
-                                    TI.execution_date == ti.execution_date)
+            filter_for_tis = ([and_(TaskInstance.dag_id == ti.dag_id,
+                                    TaskInstance.task_id == ti.task_id,
+                                    TaskInstance.execution_date == ti.execution_date)

Review comment:
       Yes. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #10499:
URL: https://github.com/apache/airflow/pull/10499


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475924768



##########
File path: airflow/sensors/base_sensor_operator.py
##########
@@ -159,6 +160,16 @@ def _get_next_poke_interval(self, started_at, try_number):
         else:
             return self.poke_interval
 
+    def prepare_for_execution(self) -> BaseOperator:
+        task = super().prepare_for_execution()
+        # Sensors in `poke` mode can block execution of DAGs when running
+        # with single process executor, thus we change the mode to`reschedule`
+        # to allow parallel task being scheduled and executed
+        if conf.get('core', 'executor') == "DebugExecutor":

Review comment:
       I have a problem with importing it as constants (MyPy complains). I prefer to leave it :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475457567



##########
File path: airflow/models/taskinstance.py
##########
@@ -1762,19 +1767,18 @@ def filter_for_tis(
         tis: Iterable[Union["TaskInstance", TaskInstanceKey]]
     ) -> Optional[BooleanClauseList]:
         """Returns SQLAlchemy filter to query selected task instances"""
-        TI = TaskInstance
         if not tis:
             return None
         if all(isinstance(t, TaskInstanceKey) for t in tis):
-            filter_for_tis = ([and_(TI.dag_id == tik.dag_id,
-                                    TI.task_id == tik.task_id,
-                                    TI.execution_date == tik.execution_date)
+            filter_for_tis = ([and_(TaskInstance.dag_id == tik.dag_id,
+                                    TaskInstance.task_id == tik.task_id,
+                                    TaskInstance.execution_date == tik.execution_date)
                                for tik in tis])
             return or_(*filter_for_tis)
         if all(isinstance(t, TaskInstance) for t in tis):
-            filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
-                                    TI.task_id == ti.task_id,
-                                    TI.execution_date == ti.execution_date)
+            filter_for_tis = ([and_(TaskInstance.dag_id == ti.dag_id,
+                                    TaskInstance.task_id == ti.task_id,
+                                    TaskInstance.execution_date == ti.execution_date)

Review comment:
       > Is this another attempt to fix cyclic imports? #9674
   
   It was not supposed to be, but it turned out to be exactly this :). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10499: Make models/taskinstance.py pylint compatible

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10499:
URL: https://github.com/apache/airflow/pull/10499#discussion_r475458366



##########
File path: airflow/models/taskinstance.py
##########
@@ -442,16 +442,16 @@ def generate_command(dag_id: str,     # pylint: disable=too-many-arguments
     def log_filepath(self):
         """Filepath for TaskInstance"""
         iso = self.execution_date.isoformat()
-        log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
+        the_log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
         return ("{log}/{dag_id}/{task_id}/{iso}.log".format(
-            log=log, dag_id=self.dag_id, task_id=self.task_id, iso=iso))
+            log=the_log, dag_id=self.dag_id, task_id=self.task_id, iso=iso))

Review comment:
       Why not :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org