You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/18 15:11:07 UTC

[GitHub] [airflow] turbaszek opened a new issue #10381: Add on_kill method to DataprocSubmitJobOperator

turbaszek opened a new issue #10381:
URL: https://github.com/apache/airflow/issues/10381


   **Description**
   
   This operator should cancel running job `on_kill`. This option probably should be configurable because of `request_id` prameter in a job definition: https://googleapis.dev/python/dataproc/latest/gapic/v1/api.html#google.cloud.dataproc_v1.JobControllerClient.submit_job
   
   **Use case / motivation**
   
   Remove dangling jobs.
   
   **Related Issues**
   
   https://github.com/apache/airflow/pull/6371
   https://github.com/apache/airflow/pull/6371#issuecomment-590757917
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek closed issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek closed issue #10381:
URL: https://github.com/apache/airflow/issues/10381


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #10381:
URL: https://github.com/apache/airflow/issues/10381#issuecomment-683633639


   I think we should limit the `on_kill` to canceling the job. What is more, I think it should be configurable to allow attaching to an existing job using `request_id` 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #10381:
URL: https://github.com/apache/airflow/issues/10381#issuecomment-675654161


   Dataproc jobs are kind of a wild wild west and may have significant side effects. From a documentation perspective we should call out that `on_kill` simply kills the job but will not "roll back" changes in external systems (GCS, Hive Metastore, BQ, pubsub, etc) that may have occurred. Users should be careful to handle any such scenarios in the logic of their pipelines.
   
   A Few examples
   -even before completing as a spark driver could make arbitrary calls mutation data on GCS or a database (e.g. could write some sort of lock file that ends up being abandoned).
   -If you snipe a map reduce job in the middle and any intermediate files we flushed to GCS those will not get cleaned up.
   - a hive jobs can contain multiple query statements (e.g. a CREATE TABLE and a INSERT INTO) which may leave a side effect of a new empty table in hive metastore
   - sniping a spark streaming job subscribing to pubsub may lead to ACKed messages who's corresponding outputs were not committed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #10381:
URL: https://github.com/apache/airflow/issues/10381#issuecomment-688680747


   `DataprocSubmitJobOperator` has no relation with `DataprocJobBaseOperator`. 
   
   `DataprocJobBaseOperator` is used by "old" operators like `DataprocSubmitPigJobOperator`, `DataprocSubmitHiveJobOperator` etc. that are deprecated in favor of the generic operator `DataprocSubmitJobOperator`.
   
   However, you are right that the logic of `on_kill` currently exists in old ops and may be reused in `DataprocSubmitJobOperator.on_kill` 😉 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #10381:
URL: https://github.com/apache/airflow/issues/10381#issuecomment-675650088


   Sounds like a good feature to have.
   My only thought is that we should also add similar `on_kill` method for workflow template operators.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #10381:
URL: https://github.com/apache/airflow/issues/10381#issuecomment-675541492


   @edejong @jaketf @potiuk I would love to hear your opinion on this one 👍 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf edited a comment on issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #10381:
URL: https://github.com/apache/airflow/issues/10381#issuecomment-675654161


   Dataproc jobs are kind of a wild wild west and may have significant side effects. From a documentation perspective we should call out that `on_kill` simply kills the job but will not "roll back" changes in external systems (GCS, Hive Metastore, BQ, pubsub, etc) that may have occurred. Users should be careful to handle any such scenarios in the logic of their pipelines.
   
   A Few examples
   - even before completing as a spark driver could make arbitrary calls mutation data on GCS or a database (e.g. could write some sort of lock file that ends up being abandoned).
   - If you snipe a map reduce job in the middle and any intermediate files we flushed to GCS those will not get cleaned up.
   - a hive jobs can contain multiple query statements (e.g. a CREATE TABLE and a INSERT INTO) which may leave a side effect of a new empty table in hive metastore
   - sniping a spark streaming job subscribing to pubsub may lead to ACKed messages who's corresponding outputs were not committed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf edited a comment on issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #10381:
URL: https://github.com/apache/airflow/issues/10381#issuecomment-675654161


   Dataproc jobs are kind of a wild wild west and may have significant side effects. From a documentation perspective we should call out that `on_kill` simply kills the job but will not "roll back" changes in external systems (GCS, Hive Metastore, BQ, pubsub, etc) that may have occurred. Users should be careful to handle any such scenarios in the logic of their pipelines. This may seem obvious to us but may not be clear to users (if we contrast to a BigQuery query job where everything is controlled internally and if a job is cancelled nothing happens because all results are committed atomically).
   
   A Few examples
   - even before completing as a spark driver could make arbitrary calls mutation data on GCS or a database (e.g. could write some sort of lock file that ends up being abandoned).
   - If you snipe a map reduce job in the middle and any intermediate files we flushed to GCS those will not get cleaned up.
   - a hive jobs can contain multiple query statements (e.g. a CREATE TABLE and a INSERT INTO) which may leave a side effect of a new empty table in hive metastore
   - sniping a spark streaming job subscribing to pubsub may lead to ACKed messages who's corresponding outputs were not committed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on issue #10381:
URL: https://github.com/apache/airflow/issues/10381#issuecomment-688680747


   `DataprocSubmitJobOperator` has no relation with `DataprocJobBaseOperator`. 
   
   `DataprocJobBaseOperator` is used by "old" operators like `DataprocSubmitPigJobOperator`, `DataprocSubmitHiveJobOperator` etc. that are deprecated in favor of the generic operator `DataprocSubmitJobOperator`.
   
   However, you are right that the logic of `on_kill` currently exists in old ops and may be reused in `DataprocSubmitJobOperator.on_kill` 😉 
   
   > Thus the following code might do the job:
   
   Looks good but let's remove the `request_id` as I'm still not sure how it works


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tszerszen commented on issue #10381: Add on_kill method to DataprocSubmitJobOperator

Posted by GitBox <gi...@apache.org>.
tszerszen commented on issue #10381:
URL: https://github.com/apache/airflow/issues/10381#issuecomment-688672324


   In package `airflow.providers.google.cloud.operators.dataproc` each DataprocSubmitJobOperator inherits from `DataprocJobBaseOperator`. 
   
   `DataprocJobBaseOperator` has the following implementation of `on_kill` method (lines 984-992):
   ``` 
   def on_kill(self):
       """
       Callback called when the operator is killed.
       Cancel any running job.
       """
       if self.dataproc_job_id:
           self.hook.cancel_job(
               project_id=self.project_id, job_id=self.dataproc_job_id, location=self.region
           )
   ```
   
   @turbaszek wrote:
   
   > This operator should implement on_kill method using cancel_job method of DataprocHook so in case of termination we cancel running job. 
   
   Considering present implementation of `on_kill` method of `DataprocJobBaseOperator` isn't it already done?
   The code call's method `cancel_job` of `DataprocHook` when `on_kill` method is run and property `dataproc_job_id` is present.
   
   If I understand this correctly, the only thing that method lacks is:
   
   > This option probably should be configurable (for example cancel_on_kill) because of request_id parameter in a job definition
   
   Thus the following code might do the job:
   
   ```
   def on_kill(self, cancel_on_kill=True, request_id=None):
       """
       Callback called when the operator is killed.
       Cancel any running job.
   
       Parameters:
           cancel_on_kill (bool): Cancel job if true, defaults to True
           request_id (job_id): Job ID to cancel, defaults to object property dataproc_job_id
       """
       if not cancel_on_kill:
           return
       job_id = self.dataproc_job_id
       if request_id:
           job_id = request_id
       if self.dataproc_job_id:
           self.hook.cancel_job(
               project_id=self.project_id, job_id=job_id, location=self.region
           )
   ```
   
   Did I understand this issue correctly? If not I guess calling hook's `cancel_job` method is not enough and I will investigate it further.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org