You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/25 02:38:22 UTC

[GitHub] [airflow] mik-laj opened a new pull request #8553: Add DataflowStartSQLQuery operator

mik-laj opened a new pull request #8553:
URL: https://github.com/apache/airflow/pull/8553


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416157606



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
       We strive to maintain a high level of reliability for GCP integration. All integrations that are developed by my team must have system tests. System tests are required criteria for internal review. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416169086



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+    JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
     JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+    JOB_STATE_DONE = "JOB_STATE_DONE"
     JOB_STATE_FAILED = "JOB_STATE_FAILED"
     JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+    JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+    JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+    JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
     JOB_STATE_PENDING = "JOB_STATE_PENDING"
-    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-    SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-    END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+    JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+    JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED}
+    SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, JOB_STATE_DRAINED}
+    TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES

Review comment:
       Airflow does not have the ability to display full information about the status of the job in an external system. We only have two states - SUCCESS/FAILED.  What are you proposing then? Can the user specify expected end-states? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-714468862


   [The Workflow run](https://github.com/apache/airflow/actions/runs/322041578) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416142108



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table
+            GROUP BY sales_region;
+        """,
+        options={
+            "bigquery-project": GCP_PROJECT_ID,
+            "bigquery-dataset": BQ_SQL_DATASET,
+            "bigquery-table": "beam_output",
+            'bigquery-write-disposition': "write-truncate",
+        },
+        location=DATAFLOW_SQL_LOCATION,
+        do_xcom_push=True,

Review comment:
       I still asked  @jaketfI  to review.  Before this change is merged, it will also be reviewed by at least one Apache Airflow commiter. .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508222123



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.
+
+    :param job_name: The unique name to assign to the Cloud Dataflow job.
+    :type job_name: str
+    :param query: The SQL query to execute.
+    :type query: str
+    :param options: Job parameters to be executed. It can be a dictionary with the following keys.
+
+        For more information, look at:
+        `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+        <gcloud beta dataflow sql query>`__
+        command reference
+
+    :param options: dict
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["job_name", 'query', 'options', 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_name = job_name
+        self.query = query
+        self.options = options
+        self.location = location
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.job_id = None
+        self.hook: Optional[DataflowHook] = None
+
+    def execute(self, context):
+        self.hook = DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        def set_current_job_id(job_id):
+            self.job_id = job_id
+
+        job = self.hook.start_sql_job(
+            job_name=self.job_name,
+            query=self.query,
+            options=self.options,
+            location=self.location,
+            project_id=self.project_id,
+            on_new_job_id_callback=set_current_job_id,
+        )
+
+        return job
+
+    def on_kill(self) -> None:
+        self.log.info("On kill.")
+        if self.job_id:
+            self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)

Review comment:
       I fixed it in hook.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416125310



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+    JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
     JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+    JOB_STATE_DONE = "JOB_STATE_DONE"
     JOB_STATE_FAILED = "JOB_STATE_FAILED"
     JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+    JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+    JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+    JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
     JOB_STATE_PENDING = "JOB_STATE_PENDING"
-    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-    SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-    END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+    JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+    JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED}

Review comment:
       JOB_STATE_STOPPED is not a failed state.  (See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate)

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -496,17 +517,15 @@ def start_java_dataflow(
         variables['jobName'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}'.format(
-                json.dumps(labels_dict).replace(' ', ''))]
+        if 'labels' in variables:
+            variables['labels'] = json.dumps(variables['labels']).replace(' ', '')

Review comment:
       This is modifying user provided input. Is this your intention?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: disable=too-many-arguments
         variables['job_name'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}={}'.format(key, value)
-                    for key, value in labels_dict.items()]
+        if 'labels' in variables:
+            variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()]

Review comment:
       Why is this formatter different than the one from L521?
   
   Could we move all label fomating to the place where dataflow job is triggered?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',

Review comment:
       Is the beta still required, do you know?

##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
       Who is going to create this dataset? Can we use a public dataset so that examples works for anyone?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():
+            proc = subprocess.run(  # pylint: disable=subprocess-run-check
+                cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        self.log.info("Output: %s", proc.stdout.decode())
+        self.log.info("Stderr: %s", proc.stderr.decode())

Review comment:
       log.error for stderr maybe?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():

Review comment:
       What does shlex.quote() do?

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.
+
+    :param job_name: The unique name to assign to the Cloud Dataflow job.
+    :type job_name: str
+    :param query: The SQL query to execute.
+    :type query: str
+    :param options: Job parameters to be executed. It can be a dictionary with the following keys.
+
+        For more information, look at:
+        `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+        <gcloud beta dataflow sql query>`__
+        command reference
+
+    :param options: dict
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str

Review comment:
       What is gcp_conn_id?

##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table
+            GROUP BY sales_region;
+        """,
+        options={
+            "bigquery-project": GCP_PROJECT_ID,
+            "bigquery-dataset": BQ_SQL_DATASET,
+            "bigquery-table": "beam_output",
+            'bigquery-write-disposition': "write-truncate",
+        },
+        location=DATAFLOW_SQL_LOCATION,
+        do_xcom_push=True,

Review comment:
       Just a note, I can review Dataflow aspects but not very familiar with Airflow. For example I am not sure what do_xcom_push is. It would be good to get an airflow review as well.

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.
+
+    :param job_name: The unique name to assign to the Cloud Dataflow job.
+    :type job_name: str
+    :param query: The SQL query to execute.
+    :type query: str
+    :param options: Job parameters to be executed. It can be a dictionary with the following keys.
+
+        For more information, look at:
+        `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+        <gcloud beta dataflow sql query>`__
+        command reference
+
+    :param options: dict
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["job_name", 'query', 'options', 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_name = job_name
+        self.query = query
+        self.options = options
+        self.location = location
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.job_id = None
+        self.hook: Optional[DataflowHook] = None
+
+    def execute(self, context):
+        self.hook = DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        def set_current_job_id(job_id):
+            self.job_id = job_id
+
+        job = self.hook.start_sql_job(
+            job_name=self.job_name,
+            query=self.query,
+            options=self.options,
+            location=self.location,
+            project_id=self.project_id,
+            on_new_job_id_callback=set_current_job_id,
+        )
+
+        return job
+
+    def on_kill(self) -> None:
+        self.log.info("On kill.")
+        if self.job_id:
+            self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)

Review comment:
       Do you want to call this even if job is cancelled/stopped/finished?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416187351



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():

Review comment:
       This is only for logs. I used it to test this operator.  A normal user will not copy it, but it may be helpful to him for debugging only.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r501799152



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():
+            proc = subprocess.run(  # pylint: disable=subprocess-run-check
+                cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        self.log.info("Output: %s", proc.stdout.decode())
+        self.log.info("Stderr: %s", proc.stderr.decode())

Review comment:
       I changed it :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ibzib commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508631169



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.

Review comment:
       Sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj merged pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #8553:
URL: https://github.com/apache/airflow/pull/8553


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r497550417



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"

Review comment:
       created separate issue for stutter: https://github.com/apache/airflow/issues/11205
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-620222175


   /cc @kennknowles


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r510106178



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -496,17 +517,15 @@ def start_java_dataflow(
         variables['jobName'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}'.format(
-                json.dumps(labels_dict).replace(' ', ''))]
+        if 'labels' in variables:
+            variables['labels'] = json.dumps(variables['labels']).replace(' ', '')

Review comment:
       > This is modifying user provided input. Is this your intention?
   
   @aaltay the reason of this replace was to avoid spaces in json, change of the user input was side effect.
   I changed it to `variables['labels'] = json.dumps(variables['labels'], separators=(',', ':'))` so the json is compact as well and labels provided by the user are not touched.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r504576902



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -431,13 +448,17 @@ def _start_dataflow(
         variables: Dict,
         name: str,
         command_prefix: List[str],
-        label_formatter: Callable[[Dict], List[str]],
         project_id: str,
         multiple_jobs: bool = False,
         on_new_job_id_callback: Optional[Callable[[str], None]] = None,
         location: str = DEFAULT_DATAFLOW_LOCATION
     ) -> None:
-        cmd = command_prefix + self._build_cmd(variables, label_formatter, project_id)
+        cmd = command_prefix + [
+            "--runner=DataflowRunner",

Review comment:
       In this Q, we want to start working on operators for Apache Beam.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-705675535


   [The Workflow run](https://github.com/apache/airflow/actions/runs/295811955) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ibzib commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508064298



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)

Review comment:
       > In Airflow, you can define default arguments that will be all operators, but the parameter name must be consistent across all operators.
   
   Makes sense, thanks for the explanation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-619478132


   @jaketf Can I ask for review? I know that you are also interested in integration with Dataflow. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r496513620



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+    JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
     JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+    JOB_STATE_DONE = "JOB_STATE_DONE"
     JOB_STATE_FAILED = "JOB_STATE_FAILED"
     JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+    JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+    JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+    JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
     JOB_STATE_PENDING = "JOB_STATE_PENDING"
-    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-    SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-    END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+    JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+    JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED}

Review comment:
       I agree with @aaltay . It looks like proper place for this status is in `AWAITING_STATES`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kennknowles commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416136954



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+    JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
     JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+    JOB_STATE_DONE = "JOB_STATE_DONE"
     JOB_STATE_FAILED = "JOB_STATE_FAILED"
     JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+    JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+    JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+    JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
     JOB_STATE_PENDING = "JOB_STATE_PENDING"
-    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-    SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-    END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+    JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+    JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED}
+    SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, JOB_STATE_DRAINED}
+    TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES

Review comment:
       Are these lists used anywhere else? I think that the success/fail distinction is artificial. You cannot really say if CANCELED is a failure or not. Probably the same with DRAINED and UPDATED. Whatever is looking at the job status probably wants the full details.

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.

Review comment:
       @ibzib would be a good reviewer here

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',

Review comment:
       It is not required.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r501046939



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():

Review comment:
       IMHO it is not particularly required but nice to have it :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-712428727


   [The Workflow run](https://github.com/apache/airflow/actions/runs/316249228) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r504573298



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -431,13 +448,17 @@ def _start_dataflow(
         variables: Dict,
         name: str,
         command_prefix: List[str],
-        label_formatter: Callable[[Dict], List[str]],
         project_id: str,
         multiple_jobs: bool = False,
         on_new_job_id_callback: Optional[Callable[[str], None]] = None,
         location: str = DEFAULT_DATAFLOW_LOCATION
     ) -> None:
-        cmd = command_prefix + self._build_cmd(variables, label_formatter, project_id)
+        cmd = command_prefix + [
+            "--runner=DataflowRunner",

Review comment:
       It will be quite huge task to write Apache Beam Hooks and Operators but worth to keep it in mind to do this in future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416176659



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
       @ibzib - is there a public dataset that could be used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416140501



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
       This bucket is created in system tests. https://github.com/apache/airflow/pull/8553/files#
   Unfortunately, Dataflow SQL is not compatible with the public datasets I know.
   
   I got the following error when I referred to the public dataset.
   
   > Caused by: java.lang.UnsupportedOperationException: Field type 'NUMERIC' is not supported (field 'value')
   
   Its error message from BigQuery console
   ![Screenshot 2020-04-27 at 22 52 46](https://user-images.githubusercontent.com/12058428/80419684-de796580-88d9-11ea-85bd-1d5473c13901.png)
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r509343344



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+    JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
     JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+    JOB_STATE_DONE = "JOB_STATE_DONE"
     JOB_STATE_FAILED = "JOB_STATE_FAILED"
     JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+    JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+    JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+    JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
     JOB_STATE_PENDING = "JOB_STATE_PENDING"
-    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-    SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-    END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+    JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+    JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED}
+    SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, JOB_STATE_DRAINED}
+    TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES

Review comment:
       I agree with you. I created issue for it: https://github.com/apache/airflow/issues/11721 and I will work on in separate PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-706173153


   [The Workflow run](https://github.com/apache/airflow/actions/runs/297525936) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-714374029


   [The Workflow run](https://github.com/apache/airflow/actions/runs/321741899) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-721231951


   Rebased on the latest master


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416152856



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():
+            proc = subprocess.run(  # pylint: disable=subprocess-run-check
+                cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        self.log.info("Output: %s", proc.stdout.decode())
+        self.log.info("Stderr: %s", proc.stderr.decode())

Review comment:
       `stderr` often contains developer information. There are not only errors. I will change it to `log.warning`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-713772255


   [The Workflow run](https://github.com/apache/airflow/actions/runs/320403262) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416176973



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: disable=too-many-arguments
         variables['job_name'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}={}'.format(key, value)
-                    for key, value in labels_dict.items()]
+        if 'labels' in variables:
+            variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()]

Review comment:
       OK. Thank you for the explanation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508222533



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -694,29 +710,23 @@ def _build_dataflow_job_name(job_name: str, append_job_name: bool = True) -> str
         return safe_job_name
 
     @staticmethod
-    def _build_cmd(variables: Dict, label_formatter: Callable, project_id: str) -> List[str]:
-        command = [
-            "--runner=DataflowRunner",
-            "--project={}".format(project_id),
-        ]
-        if variables is None:
-            return command
-
+    def _options_to_args(variables: Dict):
+        if not variables:
+            return []
         # The logic of this method should be compatible with Apache Beam:
         # https://github.com/apache/beam/blob/b56740f0e8cd80c2873412847d0b336837429fb9/sdks/python/
         # apache_beam/options/pipeline_options.py#L230-L251
+        args: List[str] = []
         for attr, value in variables.items():
-            if attr == 'labels':
-                command += label_formatter(value)
-            elif value is None:
-                command.append(f"--{attr}")
+            if value is None:

Review comment:
       Done :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416284747



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():

Review comment:
       Is this really required if it is only for logs? subprocess.run does not need to escape them anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-704872745


   [The Build Workflow run](https://github.com/apache/airflow/actions/runs/293259356) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-712429002






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416157606



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
       We strive to maintain a high level of reliability for GCP integration. All integrations that are developed by my team must have system tests. System tests are required criteria for internal review. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416154847



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.
+
+    :param job_name: The unique name to assign to the Cloud Dataflow job.
+    :type job_name: str
+    :param query: The SQL query to execute.
+    :type query: str
+    :param options: Job parameters to be executed. It can be a dictionary with the following keys.
+
+        For more information, look at:
+        `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+        <gcloud beta dataflow sql query>`__
+        command reference
+
+    :param options: dict
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["job_name", 'query', 'options', 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_name = job_name
+        self.query = query
+        self.options = options
+        self.location = location
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.job_id = None
+        self.hook: Optional[DataflowHook] = None
+
+    def execute(self, context):
+        self.hook = DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        def set_current_job_id(job_id):
+            self.job_id = job_id
+
+        job = self.hook.start_sql_job(
+            job_name=self.job_name,
+            query=self.query,
+            options=self.options,
+            location=self.location,
+            project_id=self.project_id,
+            on_new_job_id_callback=set_current_job_id,
+        )
+
+        return job
+
+    def on_kill(self) -> None:
+        self.log.info("On kill.")
+        if self.job_id:
+            self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)

Review comment:
       Good point. I will skip jobs in the terminal state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TheNeuralBit commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r420363436



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
       Yeah Dataflow SQL doesn't support GEOGRAPHY or NUMERIC, but I'm sure there are many public datasets that don't use those types. `chicago_taxi_trips.taxi_trips` looks like it will work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ibzib commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r420349337



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.
+
+    :param job_name: The unique name to assign to the Cloud Dataflow job.
+    :type job_name: str
+    :param query: The SQL query to execute.
+    :type query: str
+    :param options: Job parameters to be executed. It can be a dictionary with the following keys.
+
+        For more information, look at:
+        `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+        <gcloud beta dataflow sql query>`__
+        command reference
+
+    :param options: dict
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["job_name", 'query', 'options', 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        location: str = DEFAULT_DATAFLOW_LOCATION,

Review comment:
       Dataflow has deliberately been trying to move away from using a default location, because many users may not realize that their job is running in us-central1 even if that is not intended.
   

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)

Review comment:
       Unless you have a good reason to rename this `location`, I would use `region` because it is more specific and consistent with Beam/Dataflow usage.
   

##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
       > Unfortunately, Dataflow SQL is not compatible with the public datasets I know.
   
   Yeah, the table will have to have a schema that is compatible with DF SQL. A canonical public BQ dataset seems like something we should definitely have in the docs, but I couldn't find one.

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.

Review comment:
       Nit: `parameters` itself is one of the arguments that can be passed here (see https://cloud.google.com/dataflow/docs/guides/sql/parameterized-queries). Maybe use "arguments" instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r418995083



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"

Review comment:
       nit: for var names is `JOB_STATE_` prefix really necessary on all these?
   IMO slightly more readable to drop it and this causes "stutter" `DataflowJobStatus.JOB_STATE_xxx`.
   
   A forward looking thought (though not backwards compatible so not immediate suggestion) in python 3.8+ this set could be more concise  with walrus operator e.g.
   ```python
   FAILED_END_STATES = {
       (FAILED := "JOB_STATE_FAILED"),
       (CANCELLED := "JOB_STATE_CANCELLED"),
       (STOPPED := "JOB_STATE_STOPPED")
   }
   ```

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+    JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
     JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+    JOB_STATE_DONE = "JOB_STATE_DONE"
     JOB_STATE_FAILED = "JOB_STATE_FAILED"
     JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+    JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+    JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+    JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
     JOB_STATE_PENDING = "JOB_STATE_PENDING"
-    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-    SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-    END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+    JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+    JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED}
+    SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, JOB_STATE_DRAINED}
+    TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES

Review comment:
       I think these are reasonable defaults, but I agree it would be nice to let the users set as parameters.
   I could even see DRAINING as a failed state (if airflow never expects a human to make manual intervention)
   I could see wanting to fail earlier on CANCELLING (rather than waiting til CANCELLED)

##########
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##########
@@ -692,10 +710,84 @@ def test_cancel_job(self, mock_get_conn, jobs_controller):
             location=TEST_LOCATION,
             name=TEST_JOB_NAME,
             poll_sleep=10,
-            project_number=TEST_PROJECT
+            project_number=TEST_PROJECT,
+            num_retries=5,
         )
         jobs_controller.cancel()
 
+    @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
+    @mock.patch(DATAFLOW_STRING.format('DataflowHook.provide_authorized_gcloud'))
+    @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
+    @mock.patch(DATAFLOW_STRING.format('subprocess.run'))
+    def test_start_sql_job_failed_to_run(
+        self, mock_run, mock_get_conn, mock_provide_authorized_gcloud, mock_controller
+    ):
+        test_job = {'id': "TEST_JOB_ID"}
+        mock_controller.return_value.get_jobs.return_value = [test_job]
+        mock_run.return_value = mock.MagicMock(
+            stdout=f"{TEST_JOB_ID}\n".encode(),
+            stderr=f"{TEST_JOB_ID}\n".encode(),
+            returncode=0
+        )
+        on_new_job_id_callback = mock.MagicMock()
+        result = self.dataflow_hook.start_sql_job(
+            job_name=TEST_SQL_JOB_NAME,
+            query=TEST_SQL_QUERY,
+            options=TEST_SQL_OPTIONS,
+            location=TEST_LOCATION,
+            project_id=TEST_PROJECT,
+            on_new_job_id_callback=on_new_job_id_callback
+        )
+        mock_run.assert_called_once_with(
+            [
+                'gcloud',
+                'beta',

Review comment:
       beta not necessary

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -431,13 +448,17 @@ def _start_dataflow(
         variables: Dict,
         name: str,
         command_prefix: List[str],
-        label_formatter: Callable[[Dict], List[str]],
         project_id: str,
         multiple_jobs: bool = False,
         on_new_job_id_callback: Optional[Callable[[str], None]] = None,
         location: str = DEFAULT_DATAFLOW_LOCATION
     ) -> None:
-        cmd = command_prefix + self._build_cmd(variables, label_formatter, project_id)
+        cmd = command_prefix + [
+            "--runner=DataflowRunner",

Review comment:
       This makes me think (larger scope than just SQL operator) of should we have Beam Operators that support other runners?
   
   For example some users it does not make sense Dataflow for smaller/shorter batch jobs say (because you have the overhead of waiting for workers to come up) For a job < 30 mins worker spin up time can be 10% performance hit. But they may still want to use Apache Beam (on say spark runner) that submits to non-ephemeral cluster (dataproc, EMR, spark on k8s, on prem infra, etc). 
   
   Would this be easy enough to achieve on Dataproc / EMR / Spark Operators ? 

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -496,17 +517,15 @@ def start_java_dataflow(
         variables['jobName'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}'.format(
-                json.dumps(labels_dict).replace(' ', ''))]
+        if 'labels' in variables:
+            variables['labels'] = json.dumps(variables['labels']).replace(' ', '')

Review comment:
       might be better to check that labels adhere to these regex (in API [docs](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job)):
   ```
   Keys must conform to regexp: [\p{Ll}\p{Lo}][\p{Ll}\p{Lo}\p{N}_-]{0,62}
   Values must conform to regexp: [\p{Ll}\p{Lo}\p{N}_-]{0,63}
   ```
   and raise exception.

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -694,29 +710,23 @@ def _build_dataflow_job_name(job_name: str, append_job_name: bool = True) -> str
         return safe_job_name
 
     @staticmethod
-    def _build_cmd(variables: Dict, label_formatter: Callable, project_id: str) -> List[str]:
-        command = [
-            "--runner=DataflowRunner",
-            "--project={}".format(project_id),
-        ]
-        if variables is None:
-            return command
-
+    def _options_to_args(variables: Dict):
+        if not variables:
+            return []
         # The logic of this method should be compatible with Apache Beam:
         # https://github.com/apache/beam/blob/b56740f0e8cd80c2873412847d0b336837429fb9/sdks/python/
         # apache_beam/options/pipeline_options.py#L230-L251
+        args: List[str] = []
         for attr, value in variables.items():
-            if attr == 'labels':
-                command += label_formatter(value)
-            elif value is None:
-                command.append(f"--{attr}")
+            if value is None:

Review comment:
       nit: this if and the next elif take the same action and could be combined
   ```suggestion
               if value is None or (isinstance(value, bool) and value):
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508282889



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.
+
+    :param job_name: The unique name to assign to the Cloud Dataflow job.
+    :type job_name: str
+    :param query: The SQL query to execute.
+    :type query: str
+    :param options: Job parameters to be executed. It can be a dictionary with the following keys.
+
+        For more information, look at:
+        `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+        <gcloud beta dataflow sql query>`__
+        command reference
+
+    :param options: dict
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["job_name", 'query', 'options', 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        location: str = DEFAULT_DATAFLOW_LOCATION,

Review comment:
       @ibzib I think we have to change it to all operators in the future. To keep consistency across all dataflow operators I would like to keep it for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-714373615


   [The Workflow run](https://github.com/apache/airflow/actions/runs/321741025) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416177435



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():

Review comment:
       but this is only for logging? Do users normally copy paste these commands out of the logs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-712717056


   [The Workflow run](https://github.com/apache/airflow/actions/runs/317318792) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-721642272


   [The Workflow run](https://github.com/apache/airflow/actions/runs/345317932) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ibzib commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508030912



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)

Review comment:
       I'm not sure which other GCP products you are referring to, but in Dataflow it's usually `--region`.
   https://cloud.google.com/dataflow/docs/concepts/regional-endpoints




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-716479966


   I replied to all comments, made requested changes, rebased on the master, made CI happy. IMHO PR is ready for final review and hopefully to be merged :)
   @aaltay @jaketf @ibzib @TheNeuralBit @kennknowles PTAL.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TheNeuralBit commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r420363436



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
       Yeah Dataflow SQL doesn't support GEOGRAPHY or NUMERIC, but I'm sure there are many public datasets that don't use those types. `chicago_taxi_trips.taxi_trips looks like it will work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416146984



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: disable=too-many-arguments
         variables['job_name'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}={}'.format(key, value)
-                    for key, value in labels_dict.items()]
+        if 'labels' in variables:
+            variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()]

Review comment:
       It depends on the SDK that is used.  These two SDK require different argument formats.
   
   We have three related methods.
   * start_python_dataflow  
   * start_java_dataflow
   * _start_dataflow
   
   The first two methods are public and dependent on the SDK. They are responsible for introducing changes dependent on the SDK, e.g. environment preparation. 
   _start_dataflow is an internal method. This starts the system process and supervises execution. I have the impression that this separation is helpful.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-712428852


   [The Workflow run](https://github.com/apache/airflow/actions/runs/316250139) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-706175676


   [The Workflow run](https://github.com/apache/airflow/actions/runs/297535079) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508221739



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -496,17 +517,15 @@ def start_java_dataflow(
         variables['jobName'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}'.format(
-                json.dumps(labels_dict).replace(' ', ''))]
+        if 'labels' in variables:
+            variables['labels'] = json.dumps(variables['labels']).replace(' ', '')

Review comment:
       @jaketf do you know maybe are these regexes are used for labels by all google apis?
   I see that google provides a comprehensive information when labels do not comply with these regex, with regex included, however we could make validation of labels for whole google provider and throw some warning to the user (before it fails during run).
   WDYT @jaketf @mik-laj 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508460182



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -496,17 +517,15 @@ def start_java_dataflow(
         variables['jobName'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}'.format(
-                json.dumps(labels_dict).replace(' ', ''))]
+        if 'labels' in variables:
+            variables['labels'] = json.dumps(variables['labels']).replace(' ', '')

Review comment:
       On the other hand, I don't think it is good idea to add unnecessary complexity to the  it and limit user. Google logs already provide extensive information about problems with labels when they occur




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416153736



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.
+
+    :param job_name: The unique name to assign to the Cloud Dataflow job.
+    :type job_name: str
+    :param query: The SQL query to execute.
+    :type query: str
+    :param options: Job parameters to be executed. It can be a dictionary with the following keys.
+
+        For more information, look at:
+        `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+        <gcloud beta dataflow sql query>`__
+        command reference
+
+    :param options: dict
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str

Review comment:
       Airflow saves all credentials(MySQL, GCP, AWS, and other) in one table in the database. It's called `connection`. This is the entry ID in this table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416150987



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():

Review comment:
       Adds escape characters if needed. 
   Example:
   If you want to display the contents of the `/tmp/` directory then you can use the command `ls /tmp/`
   If you want to display the contents of the `/tmp/i love pizza` directory then you can use the command `ls '/tmp/ i love pizza'`. `ls /tmp/i love pizza` is incorrect command. The decision about quotation characeters was made by shlex.quote. This also supports other cases required by sh e.g. quote character in an argument
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416146984



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: disable=too-many-arguments
         variables['job_name'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}={}'.format(key, value)
-                    for key, value in labels_dict.items()]
+        if 'labels' in variables:
+            variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()]

Review comment:
       It depends on the SDK that is used.  These two SDK require different argument formats.
   
   We have three related methods.
   * start_python_dataflow  
   * start_java_dataflow
   * _start_dataflow
   The first two methods are public and dependent on the SDK. They are responsible for introducing changes dependent on the SDK, e.g. environment preparation. 
   _start_dataflow is an internal method. This starts the system process and supervises execution. I have the impression that this separation is helpful.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r501044662



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',

Review comment:
       Thanks. I deleted it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-712676972


   [The Workflow run](https://github.com/apache/airflow/actions/runs/317197049) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r418996024



##########
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##########
@@ -692,10 +710,84 @@ def test_cancel_job(self, mock_get_conn, jobs_controller):
             location=TEST_LOCATION,
             name=TEST_JOB_NAME,
             poll_sleep=10,
-            project_number=TEST_PROJECT
+            project_number=TEST_PROJECT,
+            num_retries=5,
         )
         jobs_controller.cancel()
 
+    @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
+    @mock.patch(DATAFLOW_STRING.format('DataflowHook.provide_authorized_gcloud'))
+    @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
+    @mock.patch(DATAFLOW_STRING.format('subprocess.run'))
+    def test_start_sql_job_failed_to_run(
+        self, mock_run, mock_get_conn, mock_provide_authorized_gcloud, mock_controller
+    ):
+        test_job = {'id': "TEST_JOB_ID"}
+        mock_controller.return_value.get_jobs.return_value = [test_job]
+        mock_run.return_value = mock.MagicMock(
+            stdout=f"{TEST_JOB_ID}\n".encode(),
+            stderr=f"{TEST_JOB_ID}\n".encode(),
+            returncode=0
+        )
+        on_new_job_id_callback = mock.MagicMock()
+        result = self.dataflow_hook.start_sql_job(
+            job_name=TEST_SQL_JOB_NAME,
+            query=TEST_SQL_QUERY,
+            options=TEST_SQL_OPTIONS,
+            location=TEST_LOCATION,
+            project_id=TEST_PROJECT,
+            on_new_job_id_callback=on_new_job_id_callback
+        )
+        mock_run.assert_called_once_with(
+            [
+                'gcloud',
+                'beta',

Review comment:
       beta not necessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508321399



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.

Review comment:
       This is related to what @mik-laj said here: https://github.com/apache/airflow/pull/8553#discussion_r508048041
   I added parametrization to example dag so it would be nice hint for the users in case of doubts how to use it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-712677364






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-704801677


   I rebased on the latest master


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-721242742


   [The Workflow run](https://github.com/apache/airflow/actions/runs/343973701) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r508048041



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)

Review comment:
       This is an essential feature of Airflow. In Airflow, you can define default arguments that will be all operators, but the parameter name must be consistent across all operators.
   ```python
   default_args = {
       'dataflow_default_options': {
           'tempLocation': GCS_TMP,
           'stagingLocation': GCS_STAGING,
       },
       'location': 'europe-west3'
   }
   
   with models.DAG(
       "example_gcp_dataflow_native_java",
       schedule_interval=None,  # Override to match your needs
       start_date=days_ago(1),
       tags=['example'],
   ) as dag_native_java:
       start_java_job = DataflowCreateJavaJobOperator(
           task_id="start-java-job",
           jar=GCS_JAR,
           job_name='{{task.task_id}}',
           options={
               'output': GCS_OUTPUT,
           },
           poll_sleep=10,
           job_class='org.apache.beam.examples.WordCount',
           check_if_running=CheckJobRunning.IgnoreJob,
           location='europe-west3',
       )
   
       # [START howto_operator_bigquery_create_table]
       create_table = BigQueryCreateEmptyTableOperator(
           task_id="create_table",
           dataset_id=DATASET_NAME,
           table_id="test_table",
           schema_fields=[
               {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
               {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
           ],
       )
       # [END howto_operator_bigquery_create_table]
   ```
   In the above example, task `create_table` and `start-java-job` is executed in one location - `europe-west3`. 
   
   Dataflow also uses the word "location" in its API to denote this field.
   ![Screenshot 2020-10-19 at 22 36 49](https://user-images.githubusercontent.com/12058428/96508938-9ec58200-125b-11eb-9547-75b27aec7e93.png)
   
   https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-721056715


   The PR should be OK to be merged with just subset of tests as it does not modify Core of Airflow. The committers might merge it or can add a label 'full tests needed' and re-run it to run all tests if they see it is needed!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r504576902



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -431,13 +448,17 @@ def _start_dataflow(
         variables: Dict,
         name: str,
         command_prefix: List[str],
-        label_formatter: Callable[[Dict], List[str]],
         project_id: str,
         multiple_jobs: bool = False,
         on_new_job_id_callback: Optional[Callable[[str], None]] = None,
         location: str = DEFAULT_DATAFLOW_LOCATION
     ) -> None:
-        cmd = command_prefix + self._build_cmd(variables, label_formatter, project_id)
+        cmd = command_prefix + [
+            "--runner=DataflowRunner",

Review comment:
       In current Q, we want to start working on operators for Apache Beam.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416189250



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():

Review comment:
       These logs are available in Airflow Web UI, so a normal user can easily access them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r504574733



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)

Review comment:
       The main reason behind it is to keep consistency across all google provider which uses `location` parameter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] stale[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-646931304


   This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r501799152



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():
+            proc = subprocess.run(  # pylint: disable=subprocess-run-check
+                cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        self.log.info("Output: %s", proc.stdout.decode())
+        self.log.info("Stderr: %s", proc.stderr.decode())

Review comment:
       I changed it :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-705651696


   [The Workflow run](https://github.com/apache/airflow/actions/runs/295746697) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-705651696






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kennknowles commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416287771



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+    JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
     JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+    JOB_STATE_DONE = "JOB_STATE_DONE"
     JOB_STATE_FAILED = "JOB_STATE_FAILED"
     JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+    JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+    JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+    JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
     JOB_STATE_PENDING = "JOB_STATE_PENDING"
-    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-    SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-    END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+    JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+    JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED}
+    SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, JOB_STATE_DRAINED}
+    TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES

Review comment:
       I do not know airflow that well. I just tried to see how these variables were used. I missed the place where they actually affect the Airflow result. It is a good idea to let the user say what they expect, and then a failure can be anything else.
   
   Example: we have had real use cases where we deliberately cancel jobs we do not need anymore, and that can be success for streaming jobs.
   
   I think the only certain failed state is JOB_STATE_FAILED.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416146984



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: disable=too-many-arguments
         variables['job_name'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}={}'.format(key, value)
-                    for key, value in labels_dict.items()]
+        if 'labels' in variables:
+            variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()]

Review comment:
       It depends on the SDK that is used.  These two SDK require different argument formats.
   
   We have three related methods.
   * start_python_dataflow  
   * start_java_dataflow
   * _start_dataflow
   
   The first two methods are public and dependent on the SDK. This is responsible for actions regarding a specific SDK e.g. environment preparation. 
   _start_dataflow is an internal method. This starts the system process and supervises execution. I have the impression that this separation is helpful.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org