You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/22 10:43:50 UTC

[GitHub] [airflow] ayushchauhan0811 opened a new pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

ayushchauhan0811 opened a new pull request #13832:
URL: https://github.com/apache/airflow/pull/13832


   Solving this - https://github.com/apache/airflow/issues/13733


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-765603917


   > > How would that solve it ? From what I understand it would simply result in a different exception thrown - but the flow will remain the same?
   > 
   > @potiuk If you check [taskinstance.py](https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1309) code, you can check that by removing the try-catch block exception raised by timeout class will now be handled by the taskinstance instead of the batch_operator class.
   > 
   > ```
   > try:
   >     with timeout(task_copy.execution_timeout.total_seconds()):
   >         result = task_copy.execute(context=context)
   > except AirflowTaskTimeout:
   >     task_copy.on_kill()
   >     raise
   > ```
   
   Yeah, I know that. But I was wondering where the "AirflowTaskTimeout" is thrown. the AWS provider is well.. a bit convoluted ...  
   
   So I guess (purely because this is the only place where the exception is thrown in AWS provider) it is the "wait_for_task_execution"  in AWSDataSycnHook that throws it. 
   
   If so - can you please:
   
   a) add some comments describing what's going on - mention that the method might throw the timeout exception and that it should be passed back to execute method (this way the next person coming here will not have to dig deeper and will not add the try/catch back.
   
   b) can you please add a unit test case with this case to prevent regressions?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571159300



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       > this is true of every operator -- execution_timeout is something that's part of base operator right?
   
   Correct. This is like that but THE ONLY built-in operator using it is this AWS one (just look for `AirflowTaskTimeout(`) in Airflow codebase and you will be as surprised as I was :).
   
   > maybe i miss something. but i don't see anywhere in this operator (or its inherited methods) that task timeout is raised explicitly.
   
   We are following the same path here :). And it proves IMHO that some comment is needed here.
   
   I had the same question initially because I did not see it either. The AWS batch operators are built in a very strange way and while the "batch.py" does  not look like it can throw the timeout, it actually does in here:
   
   https://github.com/apache/airflow/blob/fc67521f31a0c9a74dadda8d5f0ac32c07be218d/airflow/providers/amazon/aws/hooks/datasync.py#L316
   
   Which is several stack frames below (and only if you try hard you can find it out).
   
   And somebody else did not actually know it and added the original try/except Exception (and threw Airflow Execption()) hiding the fact that there is a timeout exception raised deep in the datasync hook. 
   
   So I asked @ayushchauhan0811 to explain it here - otherwise we risk that someone will again unnowingly swallow the Timeout exception.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571151494



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       i am a bit confused @potiuk
   
   > an AirflowTaskTimeout can be raised if execution_timeout is given while creating the task
   
   this is true of every operator -- it's something that's part of base operator right?
   
   maybe i miss something.  but i don't see anywhere in this operator (or its inherited methods) that task timeout is raised explicitly.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayushchauhan0811 commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayushchauhan0811 commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-882001592


   @potiuk @dstandish Gentle reminder :sweat_smile:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571089084



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       It's about the only place where the Airflow Task Timeout exception @dstandish in the entire code. I think it's not at all 'common' knowledge what's going on here. But i do agree that refering to "previous" state is not needed. So if that can be removed maybe we have a nice compromise ;).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571387904



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       ah yes makes sense.  sorry to consume so much of your time.  and thanks for explaining.
   
   when i did "find in files" i saw this datasync reference but assumed datasync would not be involved in batch hook πŸ€”.
   
   in any case i am glad to see this change.  we should never catch Exception and reraise with AirflowException (i.e. catch and reraise for no reason) and this is a good example why... i see it somewhat frequently... it only discards information and adds indent.
   
   on the related question of task timeout exception...  curious... i wonder if this exception should be reserved only for an `execution_timeout` timeout, and thou shalt not use it otherwise?  seems like it to me...  i don't think there would be any different outcome for the task if it was `raise Exception('i give up')` -- i.e. there's no special handling behavior e.g. like there is with skip and reschedule is there?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-765315675


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better πŸš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayushchauhan0811 commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayushchauhan0811 commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-766296537


   > a) add some comments describing what's going on - mention that the method might throw the timeout exception and that it should be passed back to execute method (this way the next person coming here will not have to dig deeper and will not add the try/catch back.
   
   I have added comments to explain why the try-catch block has been removed. 
   
   > b) can you please add a unit test case with this case to prevent regressions?
   
   @potiuk I tried looking at other operators to see how they have implemented the timeout unit-test but didn't anything. Can you guide me on how to proceed with writing test for this? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-882056394


   @ayushchauhan0811  - this issue has been closed by the stale bot (this has been long time forgotten unfortunately) and I cannot reopen it after so long time. Could you please create a new branch and push it again as a new PR (cc: me when it's done)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r570705738



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       what was done previously is generally something that is only of interest in the PR -- not in the code.   in this case i'd just document what it does _now_
   
   but in this case, i would not add any documentation
   
   in general it is assumed that code can throw exceptions.  that a task timeout can be raised is true of all operators.  and you don't need to add a warning when you are not catching an exception




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayushchauhan0811 commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayushchauhan0811 commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r574487394



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       @potiuk @dstandish so what should I do? Keep the comment or remove it? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayushchauhan0811 commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayushchauhan0811 commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571075475



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       @potiuk Wanted me to add a comment describing what is going on. @dstandish I can remove the comment if you and @potiuk both agree




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayushchauhan0811 commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayushchauhan0811 commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-766304976


   I have fixed the failing test and added a try-catch block in submit_job() back. Since airflow task will only get timeout during monitor_job function execution. 
   
   Reason: The response of [batch.submit_job](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.submit_job) doesn't include a separate failure section to list the failures associated with the call. So either we can change the exception here [test_batch.py#L135](https://github.com/apache/airflow/blob/master/tests/providers/amazon/aws/operators/test_batch.py#L135) to generic `Exception` or add a try-catch block in submit_job()


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571159300



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       > this is true of every operator -- execution_timeout is something that's part of base operator right?
   
   Correct. This is like that but THE ONLY core operator using it is this AWS one
   
   > maybe i miss something. but i don't see anywhere in this operator (or its inherited methods) that task timeout is raised explicitly.
   
   We are following the same path here :). And it proves IMHO that some comment is needed here.
   
   I had the same question initially because I did not see it either. The AWS batch operators are built in a very strange way and while the "batch.py" does  not look like it can throw the timeout, it actually does in here:
   
   https://github.com/apache/airflow/blob/fc67521f31a0c9a74dadda8d5f0ac32c07be218d/airflow/providers/amazon/aws/hooks/datasync.py#L316
   
   Which is several stack frames below (and only if you try hard you can find it out).
   
   And somebody else did not actually know it and added the original try/except Exception (and threw Airflow Execption()) hiding the fact that there is a timeout exception raised deep in the datasync hook. 
   
   So I asked @ayushchauhan0811 to explain it here - otherwise we risk that someone will again unnowingly swallow the Timeout exception.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571151494



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       i am a bit confused @potiuk
   
   > an AirflowTaskTimeout can be raised if execution_timeout is given while creating the task
   
   this is true of every operator -- `execution_timeout` is something that's part of base operator right?
   
   maybe i miss something.  but i don't see anywhere in this operator (or its inherited methods) that task timeout is raised explicitly.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571397568



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       Yeah. I also do not like how it is done in AWS. The "with timeout" context manager seems universal solution that should also work for datasync operator, so I am really not sure if it is needed at all :) .  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-766300443


   Surely.  
   
   I think in this case it would be enough to mock  the underlying AWS calls returning the errors you expect when you are doing the "pooling" . I believe those timeouts are thrown when you are polling for jobs x times for some time and when you do not see the result, timeout exception is thrown. From what I understand the timeout is thrown here (datasync.py):
   
   ```
    def wait_for_task_execution(self, task_execution_arn: str, max_iterations: int = 2 * 180) -> bool:
           """
           Wait for Task Execution status to be complete (SUCCESS/ERROR).
           The ``task_execution_arn`` must exist, or a boto3 ClientError will be raised.
   
           :param str task_execution_arn: TaskExecutionArn
           :param int max_iterations: Maximum number of iterations before timing out.
           :return: Result of task execution.
           :rtype: bool
           :raises AirflowTaskTimeout: If maximum iterations is exceeded.
           :raises AirflowBadRequest: If ``task_execution_arn`` is empty.
           """
           if not task_execution_arn:
               raise AirflowBadRequest("task_execution_arn not specified")
   
           status = None
           iterations = max_iterations
           while status is None or status in self.TASK_EXECUTION_INTERMEDIATE_STATES:
               task_execution = self.get_conn().describe_task_execution(TaskExecutionArn=task_execution_arn)
               status = task_execution["Status"]
               self.log.info("status=%s", status)
               iterations -= 1
               if status in self.TASK_EXECUTION_FAILURE_STATES:
                   break
               if status in self.TASK_EXECUTION_SUCCESS_STATES:
                   break
               if iterations <= 0:
                   break
               time.sleep(self.wait_interval_seconds)
   
           if status in self.TASK_EXECUTION_SUCCESS_STATES:
               return True
           if status in self.TASK_EXECUTION_FAILURE_STATES:
               return False
           if iterations <= 0:
               raise AirflowTaskTimeout("Max iterations exceeded!")
           raise AirflowException(f"Unknown status: {status}")  # Should never happen
   ```
   
   So the `describe_task_execution` method need to be mocked to return the status in (None, TASK_EXECUTION_INTERMEDIATE_STATES) set with some lower value of max_iterations and then it should throw `AirffowTaskTimeout`. 
   
   
   However. When I looked there while writing this, I found, it is already there in fact done exactly as I described above, so you do not need to do anything (test_datasync.py):
   
   ```
       def test_wait_for_task_execution_timeout(self, mock_get_conn):
           # ### Configure mock:
           mock_get_conn.return_value = self.client
           # ### Begin tests:
   
           task_execution_arn = self.hook.start_task_execution(self.task_arn)
           with pytest.raises(AirflowTaskTimeout):
               result = self.hook.wait_for_task_execution(task_execution_arn, max_iterations=1)
               assert result is None
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #13832:
URL: https://github.com/apache/airflow/pull/13832


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571159300



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       > this is true of every operator -- execution_timeout is something that's part of base operator right?
   
   Correct. This is like that but THE ONLY built-in operator using it is this AWS one
   
   > maybe i miss something. but i don't see anywhere in this operator (or its inherited methods) that task timeout is raised explicitly.
   
   We are following the same path here :). And it proves IMHO that some comment is needed here.
   
   I had the same question initially because I did not see it either. The AWS batch operators are built in a very strange way and while the "batch.py" does  not look like it can throw the timeout, it actually does in here:
   
   https://github.com/apache/airflow/blob/fc67521f31a0c9a74dadda8d5f0ac32c07be218d/airflow/providers/amazon/aws/hooks/datasync.py#L316
   
   Which is several stack frames below (and only if you try hard you can find it out).
   
   And somebody else did not actually know it and added the original try/except Exception (and threw Airflow Execption()) hiding the fact that there is a timeout exception raised deep in the datasync hook. 
   
   So I asked @ayushchauhan0811 to explain it here - otherwise we risk that someone will again unnowingly swallow the Timeout exception.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayushchauhan0811 commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayushchauhan0811 commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r602401001



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       @potiuk if you think this looks good, can we merge it? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-837524537


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-766300619


   However you have another test that needs to be fixed: https://github.com/apache/airflow/pull/13832/checks?check_run_id=1756261837#step:6:3024


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayushchauhan0811 commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayushchauhan0811 commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-765435979


   > How would that solve it ? From what I understand it would simply result in a different exception thrown - but the flow will remain the same?
   
   @potiuk If you check [taskinstance.py](https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1309) code, you can check that by removing the try-catch block exception raised by timeout class will now be handled by the taskinstance instead of the batch_operator class. 
   
   ```
   try:
       with timeout(task_copy.execution_timeout.total_seconds()):
           result = task_copy.execute(context=context)
   except AirflowTaskTimeout:
       task_copy.on_kill()
       raise
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayushchauhan0811 commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayushchauhan0811 commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-766307310


   > Or maybe we should have "test_execute_with_failure" and "test_execute_with_timeout" as two separate methods?
   
   In the current implementation of test_execute_with_failures, we pass the return value of submit_job of the mock client to be an empty dict which throws TypeError in [batch.py#L177](https://github.com/apache/airflow/blob/master/airflow/providers/amazon/aws/operators/batch.py#L177). If we don't use a try-catch block in submit_job() and reraise all expections as `AirflowException`, our unit-test will not be covering all the cases
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r574490242



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       keep it! jarek is the boss :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayush-san commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayush-san commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571091289



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       AirflowTaskTimeout is thrown in [taskinstance.py](https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1309) code for all operators. 
   ```
   try:
       with timeout(task_copy.execution_timeout.total_seconds()):
           result = task_copy.execute(context=context)
   except AirflowTaskTimeout:
       task_copy.on_kill()
       raise
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r570705738



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       what was done previously is generally something that is only of interest in the PR -- not in the code.   in this case i'd just document what it does _now_
   
   and in this case, i would not add any documentation
   
   in general it is assumed that code can throw exceptions.  that a task timeout can be raised is true of all operators.  and you don't need to add a warning when you are not catching an exception

##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       what was done previously is generally something that is only of interest in the PR -- not in the code.   in this case i'd just document what it does _now_
   
   but in this case, i would not add any documentation
   
   in general it is assumed that code can throw exceptions.  that a task timeout can be raised is true of all operators.  and you don't need to add a warning when you are not catching an exception




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r570705738



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       what was done previously is generally something that is only of interest in the PR -- not in the code.   in this case i'd just document what it does _now_
   
   and in this case, i would not add any documentation
   
   in general it is assumed that code can throw exceptions.  that a task timeout can be raised is true of all operators.  and you don't need to add a warning when you are not catching an exception




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #13832:
URL: https://github.com/apache/airflow/pull/13832


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-766306701


   > I have fixed the failing test and added a try-catch block in submit_job() back. Since airflow task will only get timeout during monitor_job function execution.
   > 
   > Reason: The response of [batch.submit_job](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.submit_job) doesn't include a separate failure section to list the failures associated with the call. So either we can change the exception here [test_batch.py#L135](https://github.com/apache/airflow/blob/master/tests/providers/amazon/aws/operators/test_batch.py#L135) to generic `Exception` or add a try-catch block in submit_job()
   
   Or maybe we should have "test_execute_with_failure" and "test_execute_with_timeout" as two separate methods ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ayushchauhan0811 commented on a change in pull request #13832: removing try-catch block to fix timeout exception getting ignored in aws batch operator

Posted by GitBox <gi...@apache.org>.
ayushchauhan0811 commented on a change in pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#discussion_r571154673



##########
File path: airflow/providers/amazon/aws/operators/batch.py
##########
@@ -177,29 +177,26 @@ def submit_job(self, context: Dict):  # pylint: disable=unused-argument
             self.job_id = response["jobId"]
 
             self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)
-
         except Exception as e:
             self.log.error("AWS Batch job (%s) failed submission", self.job_id)
             raise AirflowException(e)
 
     def monitor_job(self, context: Dict):  # pylint: disable=unused-argument
         """
         Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done

Review comment:
       @dstandish It's part of the base operator, but some hooks/operators have also used this exception. The issue I am trying to solve in the PR is the timeout exception raised when the task is running for more than that the defined interval which is handled by taskinstance 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org