You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/27 20:39:25 UTC

[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416125310



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
     """
-    JOB_STATE_DONE = "JOB_STATE_DONE"
+    JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+    JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
     JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+    JOB_STATE_DONE = "JOB_STATE_DONE"
     JOB_STATE_FAILED = "JOB_STATE_FAILED"
     JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+    JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+    JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+    JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
     JOB_STATE_PENDING = "JOB_STATE_PENDING"
-    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-    SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-    END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+    JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+    JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED}

Review comment:
       JOB_STATE_STOPPED is not a failed state.  (See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate)

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -496,17 +517,15 @@ def start_java_dataflow(
         variables['jobName'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}'.format(
-                json.dumps(labels_dict).replace(' ', ''))]
+        if 'labels' in variables:
+            variables['labels'] = json.dumps(variables['labels']).replace(' ', '')

Review comment:
       This is modifying user provided input. Is this your intention?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: disable=too-many-arguments
         variables['job_name'] = name
         variables['region'] = location
 
-        def label_formatter(labels_dict):
-            return ['--labels={}={}'.format(key, value)
-                    for key, value in labels_dict.items()]
+        if 'labels' in variables:
+            variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()]

Review comment:
       Why is this formatter different than the one from L521?
   
   Could we move all label fomating to the place where dataflow job is triggered?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',

Review comment:
       Is the beta still required, do you know?

##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
       Who is going to create this dataset? Can we use a public dataset so that examples works for anyone?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():
+            proc = subprocess.run(  # pylint: disable=subprocess-run-check
+                cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        self.log.info("Output: %s", proc.stdout.decode())
+        self.log.info("Stderr: %s", proc.stderr.decode())

Review comment:
       log.error for stderr maybe?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -783,6 +794,77 @@ def cancel_job(
             name=job_name,
             job_id=job_id,
             location=location,
-            poll_sleep=self.poll_sleep
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_sql_job(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts Dataflow SQL query.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :type job_name: str
+        :param query: The SQL query to execute.
+        :type query: str
+        :param options: Job parameters to be executed.
+            For more information, look at:
+            `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+            <gcloud beta dataflow sql query>`__
+            command reference
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: Callback called when the job ID is known.
+        :type on_new_job_id_callback: callable
+        :return: the new job object
+        """
+        cmd = [
+            'gcloud',
+            'beta',
+            'dataflow',
+            'sql',
+            'query',
+            query,
+            f'--project={project_id}',
+            '--format=value(job.id)',
+            f'--job-name={job_name}',
+            f'--region={location}',
+            *(self._options_to_args(options))
+        ]
+        self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
+        with self.provide_authorized_gcloud():

Review comment:
       What does shlex.quote() do?

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.
+
+    :param job_name: The unique name to assign to the Cloud Dataflow job.
+    :type job_name: str
+    :param query: The SQL query to execute.
+    :type query: str
+    :param options: Job parameters to be executed. It can be a dictionary with the following keys.
+
+        For more information, look at:
+        `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+        <gcloud beta dataflow sql query>`__
+        command reference
+
+    :param options: dict
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str

Review comment:
       What is gcp_conn_id?

##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_sql",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+    start_sql = DataflowStartSqlJobOperator(
+        task_id="start_sql_query",
+        job_name=DATAFLOW_SQL_JOB_NAME,
+        query=f"""
+            SELECT
+                sales_region as sales_region,
+                count(state_id) as count_state
+            FROM
+                bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table
+            GROUP BY sales_region;
+        """,
+        options={
+            "bigquery-project": GCP_PROJECT_ID,
+            "bigquery-dataset": BQ_SQL_DATASET,
+            "bigquery-table": "beam_output",
+            'bigquery-write-disposition': "write-truncate",
+        },
+        location=DATAFLOW_SQL_LOCATION,
+        do_xcom_push=True,

Review comment:
       Just a note, I can review Dataflow aspects but not very familiar with Airflow. For example I am not sure what do_xcom_push is. It would be good to get an airflow review as well.

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+    """
+    Starts Dataflow SQL query.
+
+    :param job_name: The unique name to assign to the Cloud Dataflow job.
+    :type job_name: str
+    :param query: The SQL query to execute.
+    :type query: str
+    :param options: Job parameters to be executed. It can be a dictionary with the following keys.
+
+        For more information, look at:
+        `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+        <gcloud beta dataflow sql query>`__
+        command reference
+
+    :param options: dict
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["job_name", 'query', 'options', 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        job_name: str,
+        query: str,
+        options: Dict[str, Any],
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_name = job_name
+        self.query = query
+        self.options = options
+        self.location = location
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.job_id = None
+        self.hook: Optional[DataflowHook] = None
+
+    def execute(self, context):
+        self.hook = DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        def set_current_job_id(job_id):
+            self.job_id = job_id
+
+        job = self.hook.start_sql_job(
+            job_name=self.job_name,
+            query=self.query,
+            options=self.options,
+            location=self.location,
+            project_id=self.project_id,
+            on_new_job_id_callback=set_current_job_id,
+        )
+
+        return job
+
+    def on_kill(self) -> None:
+        self.log.info("On kill.")
+        if self.job_id:
+            self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)

Review comment:
       Do you want to call this even if job is cancelled/stopped/finished?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org