You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/18 16:16:15 UTC

[GitHub] [airflow] albertocalderari opened a new issue #8903: BigQueryHook refactor + deterministic BQ Job ID

albertocalderari opened a new issue #8903:
URL: https://github.com/apache/airflow/issues/8903


   <!--
   
   Welcome to Apache Airflow!  For a smooth issue process, try to answer the following questions.
   Don't worry if they're not all applicable; just try to include what you can :-)
   
   If you need to include code snippets or logs, please put them in fenced code
   blocks.  If they're super-long, please use the details tag like
   <details><summary>super-long log</summary> lots of stuff </details>
   
   Please delete these comment blocks before submitting the issue.
   
   -->
   
   **Description**
   
   Looking at the code it seems like  a lot of the logic in the BQ Hook is already implemented in the google API python library. This includes job polling, a nicer way to use job config and of course las the validations that we now do manually.
   It would be ice to make use of these and simplify the code.
   
   My idea is then to refactor the run_<job> methods to take the google job config and a deterministic job id.
   This would help in case of a pod dies because of any given reason, we’d restart polling for the async job previously started (I apologize for the crappy explanation).
   
   See my hacky spike below:
   
   This is the job id definition for reference
   `job_id = re.sub(r"[^0-9a-zA-Z_\-]+", "-", f"{self.dag_id}_{self.task_id}_{context['execution_date'].isoformat()}__try_0")`
   
   
   and here roughly how un query would work
   `    def run_query(self, job_id: str, job_config: QueryJobConfig, sql: str, destination_dataset_table: str = None) -> str:
           def _recurse(job_id: str):
               [j, try_num] = job_id.split("__try_")
               new_job_id = f"{j}__try_{int(try_num) + 1}"
               return run_query(new_job_id, job_config, sql)
   
           def run_query(job_id: str, job_config: QueryJobConfig, sql: str):
               if not self.project_id:
                   raise ValueError("The project_id should be set")
   
               if destination_dataset_table is not None:
                   job_config.destination = TableReference.from_string(destination_dataset_table, self.project_id)
   
               try:
                   job: QueryJob = self.client.get_job(job_id, self.project_id)
                   if job.state == 'RUNNING':
                       if job.query != sql:
                           job.cancel()
                           self.log.info(f"Job {job_id} found, but sql is different. "
                                         f"Cancelling the current job and starting a new one")
                           return _recurse(job_id)
                       self.log.info(f"Job {job_id} still running, re-starting to poll.")
                       return job.result()
                   else:
                       self.log.info(f"Job {job_id} already executed once. Restarting")
                       return _recurse(job_id)
               except NotFound:
                   self.log.info(f"Job {job_id} not found, starting a new job.")
               job: QueryJob = self.client.query(sql, job_config, job_id, project=self.project_id)
               self.log.info(f"Running Job {job_id}...")
               return job.result()
   
           return run_query(job_id, job_config, sql)`
   
   **the encoded __try_<try_num> is not the airflow but a secondary try in case the task is cleared since BQ Job Ids are a unique key and can't be re-used.**
   
   **Use case / motivation**
   Trying to use the functionalities in the google cloud library rather than re-implementing them ourselves.
   This would allow us to pass through a Deterministic Job ID too, useful for picking up jobs which are still running in case a pod dies.
   **Related Issues**
   
   <!-- Is there currently another issue associated with this? -->
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] albertocalderari commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
albertocalderari commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-630744831


   My idea was yo use the methods exposed from google.cloud.bigquery.Client along with  google.cloud.bigquery.{JobType}JobConfig rather than using dictionaries.
   Doing this we can ditch most of the custom validation we have in the hook and make the code a bit slimmer.
   
   I'd build the job configs in the operator itself and mark run_with_configuration as deprecated.
   
   I like the Idea of having only 1 method to execute any kind of job though, we could add the recursive logic in there.
   
   What do you think @turbaszek?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] albertocalderari commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
albertocalderari commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-634528875


   @turbaszek Hey, sorry work was hell this past week and did not find the time or the energy to reply :(
   I have noticed that inner job does that, and that we'd be building the config from the dictionary that is passed in. Though going full on with using {JobType}JobConfig would make method signatures a lot more friendly to the eye. I do agree also that the migration would be harder - that could be done in 2 steps:
   
   - 1 create a new method used only by the operators and mark the old ones as deprecated
   - 2 re move the deprecated methods
   
   If I can't sell this to you with this, I'll go your way and see how it comes :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-648784763


   > @turbaszek In case of downscale or pod dying you'd want to check if the job is still running, hence the need of having a job id derived from the task name and execution date.
   
   Now I got it. So if we use execution date in job_id like this `airflow_task_id_20200623T000000+0000` then in case of failure re-running the DAg will make the operator reattach to the existing job. I like it. @edejong WDYT?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek closed issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
turbaszek closed issue #8903:
URL: https://github.com/apache/airflow/issues/8903


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-642921263


   @albertocalderari I think I maybe missing something. What do you understand by deterministic job_id?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] edejong edited a comment on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
edejong edited a comment on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-641387021


   I should really check GitHub more often, I only saw the notification now.
   
   Let me know if I understand the question correctly: should the BigQueryHook's interface rely on classes from the Google API client library, or should all data be passed in as dictionaries?
   
   I think it's one thing to have the Airflow hooks/operators coupled to the BigQuery REST interface which I guess is what you get passing in the config in a Python dict. This allows you to translate online examples very easily to a DAG.
   
   But it's a much bigger step to rely on the Google client library in the API because that introduces a tight coupling to this specific library. It would only look good if the Airflow code can stay 100% agnostic about what is passed to the library. Can we guarantee that, even for the future? And does that align with other GCP products?
   
   So my personal opinion is stick with the dict :)
   
   As for generating a job id for all job types, I agree that would be a very good move. Without it you would have to wait for a response to even have something to check up on after. That works most of the time, but in cases where it goes wrong it makes it harder to troubleshoot.
   
   I love the suggested job ids string. One small thing I would change is to add a prefix such as `airflow_` or even just `af_` to make it even easier to spot these in Stackdriver for example. I would generate some well defined job id string every time one wasn't provided by the user.
   
   See https://cloud.google.com/bigquery/docs/running-jobs#generate-jobid for recommendations.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-642921263


   @albertocalderari I think I may be missing something. What do you understand by deterministic job_id?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-642029283


   @albertocalderari should we consider this issue as resolved? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] albertocalderari commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
albertocalderari commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-642918646


   @turbaszek That's only half of the issue, the other half is implementing the deterministic job id.
   So as far as I understand, Operators will interact with the hook using json config and then we'll build the job from it.
   As soon as U finish you refactor I'll start implementing the feature :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-634816078


   Summoning @edejong to hear his opinion :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] albertocalderari commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
albertocalderari commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-648763105


   @turbaszek In case of downscale or pod dying you'd want to check if the job is still running, hence the need of having a job id derived from the task name and execution date.
   We had several instances where the job is still running and a new one is started generating extra costs for no reason.
   
   If you want we can have a call and I can explain better.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] edejong commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
edejong commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-641387021


   I should really check GitHub more often, I only saw the notification now.
   
   Let me know if I understand the question correctly: should the BigQueryHook's interface rely on classes from the Google API client library, or should all data be passed in as dictionaries?
   
   I think it's one thing to have the Airflow hooks/operators coupled to the BigQuery REST interface which I guess is what you get passing in the config in a Python dict. This allows you to translate any only example very easily to a DAG.
   
   But it's a much bigger step to rely on the Google client library in the API because that introduces a tight coupling to this specific library. It would only look good if the Airflow code can stay 100% agnostic about what is passed to the library. Can we guarantee that, even for the future? And does that align with other GCP products?
   
   So my personal opinion is stick with the dict :)
   
   As for generating a job id for all job types, I agree that would be a very good move. Without it you would have to wait for a response to even have something to check up on after. That works most of the time, but in cases where it goes wrong it makes it harder to troubleshoot.
   
   I love the suggested job ids string. One small thing I would change is to add a prefix such as `airflow_` or even just `af_` to make it even easier to spot these in Stackdriver for example. I would generate some well defined job id string every time one wasn't provided by the user.
   
   See https://cloud.google.com/bigquery/docs/running-jobs#generate-jobid for recommendations.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-630443552


   Hi @albertocalderari, I'm curently workining on refactor of BQ integration. I decided to abandon custom "run" and use `insert_job` method which will accept `job_id`:
   
   https://github.com/PolideaInternal/airflow/blob/71328b19be73ff0e4820be1ee25867eb6a578820/airflow/providers/google/cloud/hooks/bigquery.py#L1432-L1441


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-630783205


   > My idea was yo use the methods exposed from google.cloud.bigquery.Client along with google.cloud.bigquery.{JobType}JobConfig rather than using dictionaries.
   
   I see your point, however this will make the migration harder. On the bright side, we are ditching the custom validation. If you will take a look at `inser_job` method you will see that I'm constructing a `XJobConfig`:
   https://github.com/apache/airflow/blob/375d1ca229464617780623c61c6e8a1bf570c87f/airflow/providers/google/cloud/hooks/bigquery.py#L1418-L1434
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-642029064


   > So my personal opinion is stick with the dict :)
   
   Same is mine. And what is more... dict is JSON serializable so it can be used as template field! 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] albertocalderari commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
albertocalderari commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-649000670


   @turbaszek yeah sort of, it’s not as simple, I really rather have a quick call, are you on airflow’s slack?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #8903: BigQueryHook refactor + deterministic BQ Job ID

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #8903:
URL: https://github.com/apache/airflow/issues/8903#issuecomment-630288970


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org