You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/25 00:17:05 UTC

[GitHub] [airflow] mik-laj opened a new pull request #8550: Add DataflowStartFlexTemplateOperator

mik-laj opened a new pull request #8550:
URL: https://github.com/apache/airflow/pull/8550


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416184937



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       The Cloud Build service will provide a new virtual machine for each build. Each build contains many steps. Each step is described by a container image. I personally would not worry about a large number of images, because after the build is completed, the virtual machine is deleted.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r497343799



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,191 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       You are right, I changed my mind. I think dictionary looks better especially that Cloud Build accept JSON config files
   
   ```
   cloud_build_config = {
       'steps': [
           {'name': 'gcr.io/cloud-builders/git', 'args': ['clone', "$_EXAMPLE_REPO", "repo_dir"]},
           {
               'name': 'gcr.io/cloud-builders/git',
               'args': ['checkout', '$_EXAMPLE_COMMIT'],
               'dir': 'repo_dir',
           },
           {
               'name': 'maven',
               'args': ['mvn', 'clean', 'package'],
               'dir': 'repo_dir/$_EXAMPLE_SUBDIR',
           },
           {
               'name': 'gcr.io/cloud-builders/docker',
               'args': ['build', '-t', '$_TEMPLATE_IMAGE', '.'],
               'dir': 'repo_dir/$_EXAMPLE_SUBDIR',
           },
       ],
       'images': ['$_TEMPLATE_IMAGE'],
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r501785531



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       PTAL: https://github.com/apache/airflow/pull/8550/commits/144b63f4ec9835d8c6c57816ab04761b83bee6c2
   
   Since the `cancel` method is executed only by the operator (`on_kill`) I didn't allow user to configure this timeout because I don't think it is worth to add complexity to the user and the code itself. It is added to prevent hanging this `_wait_for_states` forever when `execution_timeout` is not set and provide more meaningful log message if the timeout eventually occur.
   
   I proposed 5 minutes of default timeout.  @aaltay @kamilwu what do you think about this value ?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       @mik-laj what do you think about removing waiting? I lean towards @turbaszek opinion

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       @aaltay I created separated draft PR for handling draining on kill: https://github.com/apache/airflow/pull/11374. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ad-m commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
ad-m commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r474665132



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,191 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       Have you considered use standard `dict` and use `json.dumps` to generate file content? You don't use any YAML specific syntax and JSON is subset of YAML. I believe that `dict` is easier to maintain long term than a hard-coded text block.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r501934420



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       If we use this only in on_kill, should we even bother with waiting? I would assume that a successful API call should be enough for us




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r497550006



##########
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##########
@@ -984,15 +1034,17 @@ def test_dataflow_job_is_job_running_with_no_job(self):
         self.assertEqual(False, result)
 
     def test_dataflow_job_cancel_job(self):
-        job = {"id": TEST_JOB_ID, "name": TEST_JOB_NAME, "currentState": DataflowJobStatus.JOB_STATE_RUNNING}
-
         get_method = (
             self.mock_dataflow.projects.return_value.
             locations.return_value.
             jobs.return_value.
             get
         )
-        get_method.return_value.execute.return_value = job
+        get_method.return_value.execute.side_effect = [
+            {"id": TEST_JOB_ID, "name": TEST_JOB_NAME, "currentState": DataflowJobStatus.JOB_STATE_RUNNING},

Review comment:
       created separate issue for that: https://github.com/apache/airflow/issues/11205




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416180769



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:
+                    - name: gcr.io/cloud-builders/git
+                      args: ['clone', '$_EXAMPLE_REPO', 'repo_dir']
+                    - name: gcr.io/cloud-builders/git
+                      args: ['checkout', '$_EXAMPLE_COMMIT']
+                      dir: 'repo_dir'
+                    - name: alpine
+                      entrypoint: /bin/sh
+                      dir: 'repo_dir/$_EXAMPLE_SUBDIR'

Review comment:
       ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ad-m commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
ad-m commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r496726212



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,191 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       Is it easier to read a YAML dictionary inserted as text (no formatting in most IDE) than a Python dictionary with correct editor formatting?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r501785531



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       PTAL: https://github.com/apache/airflow/pull/8550/commits/144b63f4ec9835d8c6c57816ab04761b83bee6c2
   
   Since the `cancel` method is executed only by the operator (`on_kill`) I didn't allow user to configure this timeout because I don't think it is worth to add complexity to the user and the code itself. It is added to prevent hanging this `_wait_for_states` forever when `execution_timeout` is not set and provide more meaningful log message if the timeout eventually occur.
   
   I proposed 5 minutes of default timeout.  @aaltay @kamilwu what do you think about this value ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-641759094


   Hiello. 
   I had urgent tasks that prevented me from continuing my work. Now I am continuing work on Dataflow, but for now I have to fix problems with system tests.I hope to finish it next week.
   Best regards


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-710180396


   Thanks @TobKed !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-707947295


   Hey @TobKed . Can you please rebase this one to the latest master. We fixed (hopefully) a problem with queues of jobs for GitHub actions and I think when you rebase, it should run much faster (more info on devlist shortly).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-704685060


   @mik-laj could you remove **dont-merge** label, please?
   cc @potiuk @turbaszek 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] syucream commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
syucream commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-668063912


   @mik-laj Any update? I created that pull request https://github.com/PolideaInternal/airflow/pull/952 to fix the conflict and some points which were discussed here. Can you see it if it's helpful?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-702694855


   I replied to comments and made requested changes. Recently rebased on the latest master.
   
   cc @mik-laj @syucream @ad-m @aaltay @jaketf


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r501903701



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       I think this is reasonable.
   
   Side note, cancel usually completes quickly. On the other hand drain is the safer way to cancel streaming pipelines and that can take a long time depending on the state of the pipeline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-704685013


   
   
   
   > Could you rebase?
   
   @ad-m I rebased on the lastest master and squashed commits.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r501903701



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       I think this is reasonable.
   
   Side note, cancel usually completes quickly. On the other hand drain is the safer way to cancel streaming pipelines and that can take a long time depending on the state of the pipeline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r501021134



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       Should we consider implementing a timeout?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ad-m commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
ad-m commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-710715682


   Ja także dziękuje @TobKed za rozwój projektu Airflow!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r496723475



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,191 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       I think it was used for sake of simplicity. It is easier to take a look on the multiline string which represents quite simple YAML. Am I right @mik-laj ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r496645715



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +93,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

Review comment:
       I replied here https://github.com/apache/airflow/pull/8553#discussion_r496513620 and changed here https://github.com/apache/airflow/pull/8550/commits/92858ebbc39c242376304d4a9d9106263aed7369




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416159089



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       I am building an image from your repository.
   https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r502382482



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       @aaltay I created separated draft PR for handling draining on kill: https://github.com/apache/airflow/pull/11374. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r496710739



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -583,6 +623,49 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_flex_template(
+        self,
+        body: Dict,
+        location: str,
+        project_id: str,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts flex templates with the Dataflow  pipeline.
+
+        :param body: The request body

Review comment:
       I see documentation of the request body is available now:
   https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
   Thanks taking care of it @jaketf 
   I added links to in hook and operator.
   
   In relation to naming I think snake_case is used as standard for python and camelCase when it is required by external APIs (usually within dictionaries). I am not sure I understand correctly your question. Could you elaborate please?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r503980870



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       Since it is not strictly related changed and I don't want to block this PR I deleted this part of logic and created separated PR for handling waiting and timeout in job cancel: https://github.com/apache/airflow/pull/11501  (draft because I must add tests). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r427500482



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -583,6 +623,49 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_flex_template(
+        self,
+        body: Dict,
+        location: str,
+        project_id: str,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts flex templates with the Dataflow  pipeline.
+
+        :param body: The request body
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: A callback that is called when a Job ID is detected.
+        :return: the Job
+        """
+        service = self.get_conn()
+        request = service.projects().locations().flexTemplates().launch(  # pylint: disable=no-member
+            projectId=project_id,
+            body=body,
+            location=location
+        )
+        response = request.execute(num_retries=self.num_retries)
+        job_id = response['job']['id']
+
+        if on_new_job_id_callback:
+            on_new_job_id_callback(job_id)
+
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            job_id=job_id,
+            location=location,
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries)
+        jobs_controller.wait_for_done()

Review comment:
       @mik-laj was reflecting on this in light of the data fusion operator issue.
   This is "start" naming confusing.
   
   This method (and Dataflow*Start*FlexTemplateOperator) are called "start" flex template but this appears like you are waiting for the job to complete.
   
   The existing dataflow operators do not have this start word and I think the user expectation is that they poll the job to completion. Otherwise you can't do much useful downstream in the DAG without having some sensor that waits on this job completion.
   
   If we want to support blocking or not blocking I'd suggest having a `wait_for_done` kwarg that defaults to `True` (the expected behavior based on similar operators). This might mean that we need a new method in the controller `wait_for_running` that blocks until the pipeline enters the RUNNING state.
   
   What do you think?
   
   same applies for #8553 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416133310



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +93,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT:
 class DataflowJobStatus:
     """
     Helper class with Dataflow job statuses.
+    Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

Review comment:
       This file is also modified in https://github.com/apache/airflow/pull/8553. I am assuming it is the same changes, skipping this file for the review.

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,71 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartFlexTemplateOperator(BaseOperator):
+    """
+    Starts flex templates with the Dataflow  pipeline.
+
+    :param body: The request body
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["body", 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        body: Dict,
+        location: str,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.body = body
+        self.location = location
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.job_id = None
+        self.hook: Optional[DataflowHook] = None
+
+    def execute(self, context):
+        self.hook = DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        def set_current_job_id(job_id):
+            self.job_id = job_id
+
+        job = self.hook.start_flex_template(
+            body=self.body,
+            location=self.location,
+            project_id=self.project_id,
+            on_new_job_id_callback=set_current_job_id,
+        )
+
+        return job
+
+    def on_kill(self) -> None:
+        self.log.info("On kill.")
+        if self.job_id:
+            self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)

Review comment:
       Do you need to call this if job is no longer running?

##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -15,9 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os

Review comment:
       This file reads more like an end2end test of various dataflow system things. Should it be a different PR?

##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+DATAFLOW_FLEX_TEMPLATE_JOB_NAME = os.environ.get('DATAFLOW_FLEX_TEMPLATE_JOB_NAME', f"dataflow-flex-template")
+
+# For simplicity we use the same topic name as the subscription name.
+PUBSUB_FLEX_TEMPLATE_TOPIC = os.environ.get('DATAFLOW_PUBSUB_FLEX_TEMPLATE_TOPIC', "dataflow-flex-template")
+PUBSUB_FLEX_TEMPLATE_SUBSCRIPTION = PUBSUB_FLEX_TEMPLATE_TOPIC
+GCS_FLEX_TEMPLATE_TEMPLATE_PATH = os.environ.get(
+    'DATAFLOW_GCS_FLEX_TEMPLATE_TEMPLATE_PATH',
+    "gs://test-airflow-dataflow-flex-template/samples/dataflow/templates/streaming-beam-sql.json"
+)
+BQ_FLEX_TEMPLATE_DATASET = os.environ.get('DATAFLOW_BQ_FLEX_TEMPLATE_DATASET', 'airflow_dataflow_samples')
+BQ_FLEX_TEMPLATE_LOCATION = os.environ.get('DATAFLOW_BQ_FLEX_TEMPLATE_LOCAATION>', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_flex_template_java",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_flex_template:
+    start_flex_template = DataflowStartFlexTemplateOperator(
+        task_id="start_flex_template_java",

Review comment:
       maybe drop java from the name? (Given a template structure, what language was used to write it matters less.)

##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       What is being built in this cloud image pipeline?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r497534203



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,71 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartFlexTemplateOperator(BaseOperator):
+    """
+    Starts flex templates with the Dataflow  pipeline.
+
+    :param body: The request body
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["body", 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        body: Dict,
+        location: str,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.body = body
+        self.location = location
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.job_id = None
+        self.hook: Optional[DataflowHook] = None
+
+    def execute(self, context):
+        self.hook = DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        def set_current_job_id(job_id):
+            self.job_id = job_id
+
+        job = self.hook.start_flex_template(
+            body=self.body,
+            location=self.location,
+            project_id=self.project_id,
+            on_new_job_id_callback=set_current_job_id,
+        )
+
+        return job
+
+    def on_kill(self) -> None:
+        self.log.info("On kill.")
+        if self.job_id:
+            self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)

Review comment:
       Improved it here https://github.com/apache/airflow/pull/8550/commits/4b78b27d855cd61d97ffd3d2c2591b34a1a2f226




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416181912



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -15,9 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os

Review comment:
       We always try to write system tests and integrations simultaneously to ensure the best integration reliability. Thanks to this, every person on my team can easily test every integration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416180636



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:
+                    - name: gcr.io/cloud-builders/git
+                      args: ['clone', '$_EXAMPLE_REPO', 'repo_dir']
+                    - name: gcr.io/cloud-builders/git
+                      args: ['checkout', '$_EXAMPLE_COMMIT']
+                      dir: 'repo_dir'
+                    - name: alpine
+                      entrypoint: /bin/sh
+                      dir: 'repo_dir/$_EXAMPLE_SUBDIR'
+                    - name: maven
+                      args: ["mvn", "clean", "package"]
+                      dir: 'repo_dir/$_EXAMPLE_SUBDIR'
+                    - name: alpine
+                      entrypoint: "/bin/sh"
+                      args: ["-c", "ls -lh target/*.jar"]
+                      dir: 'repo_dir/$_EXAMPLE_SUBDIR'

Review comment:
       ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r496645074



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,71 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartFlexTemplateOperator(BaseOperator):
+    """
+    Starts flex templates with the Dataflow  pipeline.
+
+    :param body: The request body
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["body", 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        body: Dict,
+        location: str,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.body = body
+        self.location = location
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.job_id = None
+        self.hook: Optional[DataflowHook] = None
+
+    def execute(self, context):
+        self.hook = DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        def set_current_job_id(job_id):
+            self.job_id = job_id
+
+        job = self.hook.start_flex_template(
+            body=self.body,
+            location=self.location,
+            project_id=self.project_id,
+            on_new_job_id_callback=set_current_job_id,
+        )
+
+        return job
+
+    def on_kill(self) -> None:
+        self.log.info("On kill.")
+        if self.job_id:
+            self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)

Review comment:
       I changed it here https://github.com/apache/airflow/pull/8550/commits/1eac7722208d6f7e7723eee44c676c4f0bc5065a




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r427545395



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -583,6 +623,49 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_flex_template(
+        self,
+        body: Dict,
+        location: str,
+        project_id: str,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts flex templates with the Dataflow  pipeline.
+
+        :param body: The request body
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: A callback that is called when a Job ID is detected.
+        :return: the Job
+        """
+        service = self.get_conn()
+        request = service.projects().locations().flexTemplates().launch(  # pylint: disable=no-member
+            projectId=project_id,
+            body=body,
+            location=location
+        )
+        response = request.execute(num_retries=self.num_retries)
+        job_id = response['job']['id']
+
+        if on_new_job_id_callback:
+            on_new_job_id_callback(job_id)
+
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            job_id=job_id,
+            location=location,
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries)
+        jobs_controller.wait_for_done()

Review comment:
       I am trying to imitate the naming conventions that we currently have in this integration.
   DataflowTemplatedJobStartOperator
   It uses the verb "Start" to describe identical operations.  
   
   I plan to add a blocking and non-blocking mode to all operators in a separate PR, because this requires the development of sensors. For now, I only wanted to focus on one issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #8550:
URL: https://github.com/apache/airflow/pull/8550


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-619478099


   @jaketf Can I ask for review? I know that you are also interested in integration with Dataflow. Will this solve your clients' problems fully? If they use it, they will no longer have to use KubernetesPodOperator.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r501934420



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       If we use this only in on_kill, should we even bother with waiting? I would assume that a successful API call should be enough for us




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] syucream commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
syucream commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-641689350


   What's the current status? Do you have any progress? I want this patch, Can I help you anything? @mik-laj 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r501315981



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       We have timeout: See: https://airflow.readthedocs.io/en/latest/_api/airflow/models/index.html#airflow.models.BaseOperator -> `execution_timeout`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#issuecomment-709986906


   rebased on the latest master
   cc @potiuk @turbaszek 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416184937



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       The Cloud Build service will provide a new virtual machine for each build. Each build contains many steps. Each step is described by a container image. I personally would not worry about a large number of images, because after the build is completed, the virtual machine is deleted.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416185616



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       I removed some build steps. 
   https://github.com/apache/airflow/pull/8550/commits/aa29389cac91f29f5f37b268fa61c0a3eaabd663
   
   In order to build this Docker image I used Google Cloud Build. The Cloud Build service will provide a new virtual machine for each build. Each build contains many steps. Each step is described by a Docker image. As a result of the build, a new Docker image is built.
   
   I personally would not worry about a large number of images, because after the build is completed, the virtual machine is deleted. It does not run on the local machine.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r496659768



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+DATAFLOW_FLEX_TEMPLATE_JOB_NAME = os.environ.get('DATAFLOW_FLEX_TEMPLATE_JOB_NAME', f"dataflow-flex-template")
+
+# For simplicity we use the same topic name as the subscription name.
+PUBSUB_FLEX_TEMPLATE_TOPIC = os.environ.get('DATAFLOW_PUBSUB_FLEX_TEMPLATE_TOPIC', "dataflow-flex-template")
+PUBSUB_FLEX_TEMPLATE_SUBSCRIPTION = PUBSUB_FLEX_TEMPLATE_TOPIC
+GCS_FLEX_TEMPLATE_TEMPLATE_PATH = os.environ.get(
+    'DATAFLOW_GCS_FLEX_TEMPLATE_TEMPLATE_PATH',
+    "gs://test-airflow-dataflow-flex-template/samples/dataflow/templates/streaming-beam-sql.json"
+)
+BQ_FLEX_TEMPLATE_DATASET = os.environ.get('DATAFLOW_BQ_FLEX_TEMPLATE_DATASET', 'airflow_dataflow_samples')
+BQ_FLEX_TEMPLATE_LOCATION = os.environ.get('DATAFLOW_BQ_FLEX_TEMPLATE_LOCAATION>', 'us-west1')
+
+with models.DAG(
+    dag_id="example_gcp_dataflow_flex_template_java",
+    default_args={
+        "start_date": days_ago(1),
+    },
+    schedule_interval=None,  # Override to match your needs
+) as dag_flex_template:
+    start_flex_template = DataflowStartFlexTemplateOperator(
+        task_id="start_flex_template_java",

Review comment:
       I renamed it to `start_flex_template_streaming_beam_sql` here https://github.com/apache/airflow/pull/8550/commits/d788bab82047230829542e9eb8413d16b3565587




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r502220046



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -282,41 +295,70 @@ def wait_for_done(self) -> None:
             time.sleep(self._poll_sleep)
             self._refresh_jobs()
 
-    def get_jobs(self) -> List[Dict]:
+    def get_jobs(self, refresh=False) -> List[Dict]:
         """
         Returns Dataflow jobs.
 
         :return: list of jobs
         :rtype: list
         """
-        if not self._jobs:
+        if not self._jobs or refresh:
             self._refresh_jobs()
         if not self._jobs:
             raise ValueError("Could not read _jobs")
 
         return self._jobs
 
+    def _wait_for_states(self, expected_states: Set[str]):
+        """
+        Waiting for the jobs to reach a certain state.
+        """
+        if not self._jobs:
+            raise ValueError("The _jobs should be set")
+        while True:

Review comment:
       @mik-laj what do you think about removing waiting? I lean towards @turbaszek opinion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r419001554



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -583,6 +623,49 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_flex_template(
+        self,
+        body: Dict,
+        location: str,
+        project_id: str,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts flex templates with the Dataflow  pipeline.
+
+        :param body: The request body

Review comment:
       nit: use more descriptive name `launch_flex_template_parameters`
   
   If we are not going to construct this body here and document the parameters then this should include link tot he API spec for this model.
   
   Unfortunately it looks like we mention this end point [here in flex templates docs](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates) but not here in the [REST API docs](https://cloud.google.com/dataflow/docs/reference/rest).
   
   I've opened an internal docs bug on this.
   
   ~In the mean time I'm pretty sure this body is just the same models as the "old" templates endpoint uses [LaunchTemplateParameters](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters).
   @aaltay can you confirm?~ 
   
   Looks like LaunchTemplateParameters is missing the `container_spec_gcs_path` key which is necessary for this endpoint.
   Why is this key snake_case in the API while all others (e.g. jobName) are camelCase anyway? 

##########
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##########
@@ -984,15 +1034,17 @@ def test_dataflow_job_is_job_running_with_no_job(self):
         self.assertEqual(False, result)
 
     def test_dataflow_job_cancel_job(self):
-        job = {"id": TEST_JOB_ID, "name": TEST_JOB_NAME, "currentState": DataflowJobStatus.JOB_STATE_RUNNING}
-
         get_method = (
             self.mock_dataflow.projects.return_value.
             locations.return_value.
             jobs.return_value.
             get
         )
-        get_method.return_value.execute.return_value = job
+        get_method.return_value.execute.side_effect = [
+            {"id": TEST_JOB_ID, "name": TEST_JOB_NAME, "currentState": DataflowJobStatus.JOB_STATE_RUNNING},

Review comment:
       note from other PR #8553 see how `JobStatus.JOB_STATE` is redundant stutter?
   Especially if we allow user to control failed states we should remove the stutter.

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,71 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartFlexTemplateOperator(BaseOperator):
+    """
+    Starts flex templates with the Dataflow  pipeline.
+
+    :param body: The request body

Review comment:
       see comment on same param in hook.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416176147



##########
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##########
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
     def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id())
+    def setUp(self) -> None:
+        # Create a Cloud Storage bucket
+        self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+        # Build image with pipeline
+        with NamedTemporaryFile() as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    steps:

Review comment:
       Ack. Does it required this many containers in the image?
   
   I am not very familiar. Is this a VM image with containers?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416185412



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -406,6 +406,71 @@ def on_kill(self) -> None:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
 
 
+class DataflowStartFlexTemplateOperator(BaseOperator):
+    """
+    Starts flex templates with the Dataflow  pipeline.
+
+    :param body: The request body
+    :param location: The location of the Dataflow job (for example europe-west1)
+    :type location: str
+    :param project_id: The ID of the GCP project that owns the job.
+        If set to ``None`` or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ["body", 'location', 'project_id', 'gcp_conn_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        body: Dict,
+        location: str,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.body = body
+        self.location = location
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.job_id = None
+        self.hook: Optional[DataflowHook] = None
+
+    def execute(self, context):
+        self.hook = DataflowHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        def set_current_job_id(job_id):
+            self.job_id = job_id
+
+        job = self.hook.start_flex_template(
+            body=self.body,
+            location=self.location,
+            project_id=self.project_id,
+            on_new_job_id_callback=set_current_job_id,
+        )
+
+        return job
+
+    def on_kill(self) -> None:
+        self.log.info("On kill.")
+        if self.job_id:
+            self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)

Review comment:
       Good point. I will change it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org