You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/14 13:06:12 UTC

[GitHub] [airflow] turbaszek opened a new pull request #8868: Add BigQueryInsertJobOperator

turbaszek opened a new pull request #8868:
URL: https://github.com/apache/airflow/pull/8868


   Depends on: #8858 
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r433071133



##########
File path: tests/providers/google/cloud/operators/test_bigquery.py
##########
@@ -788,3 +789,65 @@ def test_execute(self, mock_hook):
                 project_id=TEST_GCP_PROJECT_ID,
                 table_resource=TEST_TABLE_RESOURCES
             )
+
+
+class TestBigQueryInsertJobOperator:
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
+    def test_execute(self, mock_hook):
+        job_id = "123456"
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+        mock_hook.return_value.insert_job.return_value = MagicMock(job_id=job_id)
+
+        op = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID
+        )
+        result = op.execute({})
+
+        mock_hook.return_value.insert_job.assert_called_once_with(
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+
+        assert result == job_id
+
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')

Review comment:
       Added 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r432937816



##########
File path: tests/providers/google/cloud/operators/test_bigquery.py
##########
@@ -788,3 +789,65 @@ def test_execute(self, mock_hook):
                 project_id=TEST_GCP_PROJECT_ID,
                 table_resource=TEST_TABLE_RESOURCES
             )
+
+
+class TestBigQueryInsertJobOperator:
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
+    def test_execute(self, mock_hook):
+        job_id = "123456"
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+        mock_hook.return_value.insert_job.return_value = MagicMock(job_id=job_id)
+
+        op = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID
+        )
+        result = op.execute({})
+
+        mock_hook.return_value.insert_job.assert_called_once_with(
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+
+        assert result == job_id
+
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')

Review comment:
       It's not super needed - nice to have - but I thought it might be useful to add.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r432936866



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -1570,3 +1576,76 @@ def execute(self, context):
             table_resource=self.table_resource,
             project_id=self.project_id,
         )
+
+
+class BigQueryInsertJobOperator(BaseOperator):
+    """
+    Executes a BigQuery job. Waits for the job to complete and returns job id.
+    See here:
+
+        https://cloud.google.com/bigquery/docs/reference/v2/jobs
+
+    :param configuration: The configuration parameter maps directly to BigQuery's
+        configuration field in the job  object. For more details see
+        https://cloud.google.com/bigquery/docs/reference/v2/jobs
+    :type configuration: Dict[str, Any]
+    :param job_id: The ID of the job. The ID must contain only letters (a-z, A-Z),
+        numbers (0-9), underscores (_), or dashes (-). The maximum length is 1,024
+        characters. If not provided then uuid will be generated.
+    :type job_id: str
+    :param project_id: Google Cloud Project where the job is running
+    :type project_id: str
+    :param location: location the job is running
+    :type location: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = ("configuration", "job_id")
+    ui_color = BigQueryUIColors.QUERY.value
+
+    def __init__(
+        self,
+        configuration: Dict[str, Any],
+        project_id: Optional[str] = None,
+        location: Optional[str] = None,
+        job_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.configuration = configuration
+        self.location = location
+        self.job_id = job_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context: Any):
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        try:
+            job = hook.insert_job(
+                configuration=self.configuration,
+                project_id=self.project_id,
+                location=self.location,
+                job_id=self.job_id,
+            )
+            # Start the job and wait for it to complete and get the result.
+            job.result()
+        except Conflict:
+            job = hook.get_job(
+                project_id=self.project_id,
+                location=self.location,
+                job_id=self.job_id,
+            )
+            # Get existing job and wait for it to be ready
+            while not job.done():
+                sleep(10)

Review comment:
       I've added exponential backoff as in Google `result` method with maximum of 2min, WDYT @potiuk ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek merged pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek merged pull request #8868:
URL: https://github.com/apache/airflow/pull/8868


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#issuecomment-636329897


   @potiuk @mik-laj @olchas would you mind taking a look? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r432936839



##########
File path: docs/howto/operator/gcp/bigquery.rst
##########
@@ -255,32 +255,18 @@ Let's say you would like to execute the following query.
     :end-before: [END howto_operator_bigquery_query]
 
 To execute the SQL query in a specific BigQuery database you can use
-:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator`.
+:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator` with
+proper query job configuration.
 
 .. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_bigquery_execute_query]
-    :end-before: [END howto_operator_bigquery_execute_query]
-
-``sql`` argument can receive a str representing a sql statement, a list of str
-(sql statements), or reference to a template file. Template reference are recognized
-by str ending in '.sql'.
+    :start-after: [START howto_operator_bigquery_insert_job]
+    :end-before: [END howto_operator_bigquery_insert_job]

Review comment:
       Sure, added 👌 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#issuecomment-636651207


   Great job!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r432937764



##########
File path: tests/providers/google/cloud/operators/test_bigquery.py
##########
@@ -788,3 +789,65 @@ def test_execute(self, mock_hook):
                 project_id=TEST_GCP_PROJECT_ID,
                 table_resource=TEST_TABLE_RESOURCES
             )
+
+
+class TestBigQueryInsertJobOperator:
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
+    def test_execute(self, mock_hook):
+        job_id = "123456"
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+        mock_hook.return_value.insert_job.return_value = MagicMock(job_id=job_id)
+
+        op = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID
+        )
+        result = op.execute({})
+
+        mock_hook.return_value.insert_job.assert_called_once_with(
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+
+        assert result == job_id
+
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')

Review comment:
       I mean seeing the retry behavior in practice. Reattaching to existing job succeeds immediately but might be worth (maybe) to have an example where you reattach while the job is still running and you retry once or twice after attaching before you succeed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] telac commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
telac commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r627462732



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -546,6 +548,11 @@ def __init__(self,
                 "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
             gcp_conn_id = bigquery_conn_id
 
+        warnings.warn(

Review comment:
       @judoole @turbaszek This is something I've been wondering for a long while. It seems that `BigQueryExecuteQueryOperator` makes the simple case of writing a job that queries data from one table and inserts it onto another so much simpler than `BigQueryInsertJobOperator `. The fact that `BigQueryInsertJobOperator` doesn't provide destination table as a task level parameter, doesn't provide clustering fields as a task level parameter, doesn't provide timestamp as a task level parameter but instead forces you to construct your own config fi. Overall it just seems way less friendly to use than `BigQueryInsertJobOperator `.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] judoole commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
judoole commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r627947428



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -546,6 +548,11 @@ def __init__(self,
                 "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
             gcp_conn_id = bigquery_conn_id
 
+        warnings.warn(

Review comment:
       Hey there @telac. I also asked on the Slack channel, and the deprecation is real and `BigQueryInsertJobOperator` is to be preferred in the future. The gist of it being;
   > BigQueryInsertJobOperator  exposes the full capabilities of the API and is easy to maintain.
   > The old operator - BigQueryExecuteQueryOperator needs to be updated every time a new fields in API is added.
   
   I guess personally I will create an in-house `BigQueryExecuteQueryOperator` equivalent Operator that extends from `BigQueryInsertJobOperator` and sets the specific parts we are interested in. For the most parts destination, sql, time_partitioning and write_disposition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] telac commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
telac commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r627462732



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -546,6 +548,11 @@ def __init__(self,
                 "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
             gcp_conn_id = bigquery_conn_id
 
+        warnings.warn(

Review comment:
       @judoole @turbaszek This is something I've been wondering for a long while. It seems that `BigQueryExecuteQueryOperator` makes the simple case of writing a job that queries data from one table and inserts it onto another so much simpler than `BigQueryInsertJobOperator `. The fact that `BigQueryInsertJobOperator` doesn't provide destination table as a task level parameter, doesn't provide clustering fields as a task level parameter, doesn't provide time_partitioning as a task level parameter but instead forces you to construct your own config fi. Overall it just seems way less friendly to use than `BigQueryInsertJobOperator `.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r432844561



##########
File path: tests/providers/google/cloud/operators/test_bigquery.py
##########
@@ -788,3 +789,65 @@ def test_execute(self, mock_hook):
                 project_id=TEST_GCP_PROJECT_ID,
                 table_resource=TEST_TABLE_RESOURCES
             )
+
+
+class TestBigQueryInsertJobOperator:
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
+    def test_execute(self, mock_hook):
+        job_id = "123456"
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+        mock_hook.return_value.insert_job.return_value = MagicMock(job_id=job_id)
+
+        op = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID
+        )
+        result = op.execute({})
+
+        mock_hook.return_value.insert_job.assert_called_once_with(
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+
+        assert result == job_id
+
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')

Review comment:
       Should we also add a test for retry behaviour?

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -1570,3 +1576,76 @@ def execute(self, context):
             table_resource=self.table_resource,
             project_id=self.project_id,
         )
+
+
+class BigQueryInsertJobOperator(BaseOperator):
+    """
+    Executes a BigQuery job. Waits for the job to complete and returns job id.
+    See here:
+
+        https://cloud.google.com/bigquery/docs/reference/v2/jobs
+
+    :param configuration: The configuration parameter maps directly to BigQuery's
+        configuration field in the job  object. For more details see
+        https://cloud.google.com/bigquery/docs/reference/v2/jobs
+    :type configuration: Dict[str, Any]
+    :param job_id: The ID of the job. The ID must contain only letters (a-z, A-Z),
+        numbers (0-9), underscores (_), or dashes (-). The maximum length is 1,024
+        characters. If not provided then uuid will be generated.
+    :type job_id: str
+    :param project_id: Google Cloud Project where the job is running
+    :type project_id: str
+    :param location: location the job is running
+    :type location: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = ("configuration", "job_id")
+    ui_color = BigQueryUIColors.QUERY.value
+
+    def __init__(
+        self,
+        configuration: Dict[str, Any],
+        project_id: Optional[str] = None,
+        location: Optional[str] = None,
+        job_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.configuration = configuration
+        self.location = location
+        self.job_id = job_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context: Any):
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        try:
+            job = hook.insert_job(
+                configuration=self.configuration,
+                project_id=self.project_id,
+                location=self.location,
+                job_id=self.job_id,
+            )
+            # Start the job and wait for it to complete and get the result.
+            job.result()
+        except Conflict:
+            job = hook.get_job(
+                project_id=self.project_id,
+                location=self.location,
+                job_id=self.job_id,
+            )
+            # Get existing job and wait for it to be ready
+            while not job.done():
+                sleep(10)

Review comment:
       Should we make it constant/configurable with default value? I think 10 s might be fairly frequent for some types of jobs that run for hours. I understand it's only in case the task gets restarted, but still I think it's better to get it configurable.

##########
File path: docs/howto/operator/gcp/bigquery.rst
##########
@@ -255,32 +255,18 @@ Let's say you would like to execute the following query.
     :end-before: [END howto_operator_bigquery_query]
 
 To execute the SQL query in a specific BigQuery database you can use
-:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator`.
+:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator` with
+proper query job configuration.
 
 .. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_bigquery_execute_query]
-    :end-before: [END howto_operator_bigquery_execute_query]
-
-``sql`` argument can receive a str representing a sql statement, a list of str
-(sql statements), or reference to a template file. Template reference are recognized
-by str ending in '.sql'.
+    :start-after: [START howto_operator_bigquery_insert_job]
+    :end-before: [END howto_operator_bigquery_insert_job]

Review comment:
       I think it woudl be nice to explain a bit here the job_id behavior -and how to make a job that is an "idempotent one". Just a sentence or two.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r432937559



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -1570,3 +1576,76 @@ def execute(self, context):
             table_resource=self.table_resource,
             project_id=self.project_id,
         )
+
+
+class BigQueryInsertJobOperator(BaseOperator):
+    """
+    Executes a BigQuery job. Waits for the job to complete and returns job id.
+    See here:
+
+        https://cloud.google.com/bigquery/docs/reference/v2/jobs
+
+    :param configuration: The configuration parameter maps directly to BigQuery's
+        configuration field in the job  object. For more details see
+        https://cloud.google.com/bigquery/docs/reference/v2/jobs
+    :type configuration: Dict[str, Any]
+    :param job_id: The ID of the job. The ID must contain only letters (a-z, A-Z),
+        numbers (0-9), underscores (_), or dashes (-). The maximum length is 1,024
+        characters. If not provided then uuid will be generated.
+    :type job_id: str
+    :param project_id: Google Cloud Project where the job is running
+    :type project_id: str
+    :param location: location the job is running
+    :type location: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = ("configuration", "job_id")
+    ui_color = BigQueryUIColors.QUERY.value
+
+    def __init__(
+        self,
+        configuration: Dict[str, Any],
+        project_id: Optional[str] = None,
+        location: Optional[str] = None,
+        job_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.configuration = configuration
+        self.location = location
+        self.job_id = job_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context: Any):
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        try:
+            job = hook.insert_job(
+                configuration=self.configuration,
+                project_id=self.project_id,
+                location=self.location,
+                job_id=self.job_id,
+            )
+            # Start the job and wait for it to complete and get the result.
+            job.result()
+        except Conflict:
+            job = hook.get_job(
+                project_id=self.project_id,
+                location=self.location,
+                job_id=self.job_id,
+            )
+            # Get existing job and wait for it to be ready
+            while not job.done():
+                sleep(10)

Review comment:
       Fine for me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] judoole commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
judoole commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r613244024



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -546,6 +548,11 @@ def __init__(self,
                 "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
             gcp_conn_id = bigquery_conn_id
 
+        warnings.warn(

Review comment:
       @turbaszek is this correct? That `airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator` is replaced by `BigQueryInsertJobOperator`? `BigQueryExecuteQueryOperator` seems so much more nice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8868: Add BigQueryInsertJobOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r432936461



##########
File path: tests/providers/google/cloud/operators/test_bigquery.py
##########
@@ -788,3 +789,65 @@ def test_execute(self, mock_hook):
                 project_id=TEST_GCP_PROJECT_ID,
                 table_resource=TEST_TABLE_RESOURCES
             )
+
+
+class TestBigQueryInsertJobOperator:
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
+    def test_execute(self, mock_hook):
+        job_id = "123456"
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+        mock_hook.return_value.insert_job.return_value = MagicMock(job_id=job_id)
+
+        op = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID
+        )
+        result = op.execute({})
+
+        mock_hook.return_value.insert_job.assert_called_once_with(
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+
+        assert result == job_id
+
+    @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')

Review comment:
       @potiuk what exactly you mean? There are two tests: creating a new job and reattaching to existing one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org