You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/11/18 12:37:03 UTC

[GitHub] [airflow] MrGeorgeOwl opened a new pull request, #27776: Add deferrable mode to dataflow operators

MrGeorgeOwl opened a new pull request, #27776:
URL: https://github.com/apache/airflow/pull/27776

   …StartFlexTemplateOperator operators
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054897101


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -719,6 +691,40 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    def _update_environment(self, environment: dict | None, variables: dict) -> dict:

Review Comment:
   if `environment` accepts `None`, why not add a default of `None`?
   
   ```suggestion
       def _update_environment(self, variables: dict,  environment: dict | None = None) -> dict:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054898229


##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -621,9 +624,11 @@ def __init__(
         cancel_timeout: int | None = 10 * 60,
         wait_until_finished: bool | None = None,
         append_job_name: bool = True,
+        deferrable: bool = False,

Review Comment:
   Can you add this to docstring, please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1385629063

   @brucearctor, thank you for your offer of help I appreciate that. I finished required changes at the moment so PR should be merged after it will be reviewed and approved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1080874876


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -719,6 +698,40 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    def _update_environment(self, variables: dict, environment: dict | None = None) -> dict:
+        environment = environment or {}
+        # available keys for runtime environment are listed here:
+        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
+        environment_keys = [
+            "numWorkers",
+            "maxWorkers",
+            "zone",
+            "serviceAccountEmail",
+            "tempLocation",
+            "bypassTempDirValidation",
+            "machineType",
+            "additionalExperiments",
+            "network",
+            "subnetwork",
+            "additionalUserLabels",
+            "kmsKeyName",
+            "ipConfiguration",
+            "workerRegion",
+            "workerZone",
+        ]
+
+        for key in variables:
+            if key in environment_keys:
+                if key in environment:
+                    self.log.warning(
+                        "'%s' parameter in 'variables' will override of "
+                        "the same one passed in 'environment'!",
+                        key,
+                    )
+                environment.update({key: variables[key]})

Review Comment:
   ```suggestion
           def _check_one(key, val):
               if key in environment:
                   self.log.warning(
                       "%r parameter in 'variables' will override of "
                       "the same one passed in 'environment'!",
                       key,
                   )
               return key, val
   
           environment.update(
               _check_one(key, val)
               for key, val in variables.items()
               if key in environment_keys
           )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #27776: Add deferrable mode to dataflow operators

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #27776:
URL: https://github.com/apache/airflow/pull/27776


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1357370465

   @potiuk, tests that are failing at the moment are those that comes with changes from main branch. Have the same issues in several PRs so I think that changes in PR are okay but I might be wrong


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090347021


##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
   - google-cloud-build>=3.0.0
   - google-cloud-compute>=0.1.0,<2.0.0
   - google-cloud-container>=2.2.0,<3.0.0
+  - google-cloud-dataflow-client>=0.5.2,<0.5.5

Review Comment:
   Typical Google. Thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1336255942

   Tests failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090334794


##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
   - google-cloud-build>=3.0.0
   - google-cloud-compute>=0.1.0,<2.0.0
   - google-cloud-container>=2.2.0,<3.0.0
+  - google-cloud-dataflow-client>=0.5.2,<0.5.5

Review Comment:
   Version 0.5.5 requires higher versions of protobuf and proto-plus libraries which can break other dependencies in google provider package



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090600031


##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
   - google-cloud-build>=3.0.0
   - google-cloud-compute>=0.1.0,<2.0.0
   - google-cloud-container>=2.2.0,<3.0.0
+  - google-cloud-dataflow-client>=0.5.2,<0.5.5

Review Comment:
   Yep. Comment in provider.yaml is exactly what I meant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054900437


##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -669,18 +681,63 @@ def set_current_job(current_job):
             environment=self.environment,
             append_job_name=self.append_job_name,
         )
+        job_id = self.job.get("id")
 
-        return job
+        if job_id is None:
+            raise AirflowException(
+                "While reading job object after template execution error occurred. Job object has no id."
+            )
+
+        if not self.deferrable:
+            return job_id
+
+        context["ti"].xcom_push(key="job_id", value=job_id)
+
+        self.defer(
+            trigger=TemplateJobStartTrigger(
+                project_id=self.project_id,
+                job_id=job_id,
+                location=self.location,
+                gcp_conn_id=self.gcp_conn_id,
+                delegate_to=self.delegate_to,
+                poll_sleep=self.poll_sleep,
+                impersonation_chain=self.impersonation_chain,
+                cancel_timeout=self.cancel_timeout,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any]):
+        """Method which executes after trigger finishes its work."""
+        if event["status"] == "error" or event["status"] == "stopped":
+            self.log.info("status: %s, msg: %s", event["status"], event["message"])
+            raise AirflowException(event["message"])
+
+        self.log.info("Task %s completed with response %s", self.task_id, event["message"])
 
     def on_kill(self) -> None:
         self.log.info("On kill.")
-        if self.job:
-            self.hook.cancel_job(
+        if self.job is not None:
+            self.log.info("Cancel job %s", self.job_name)

Review Comment:
   ```suggestion
               self.log.info("Cancelling job %s", self.job_name)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1029407915


##########
airflow/providers/google/cloud/example_dags/example_dataflow.py:
##########
@@ -52,6 +52,7 @@
 GCS_OUTPUT = os.environ.get("GCP_DATAFLOW_GCS_OUTPUT", "gs://INVALID BUCKET NAME/output")

Review Comment:
   System tests was migrated but at the moment I couldn't do required changes even after rebase of the branch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090331638


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -723,6 +702,40 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    def _update_environment(self, variables: dict, environment: dict | None = None) -> dict:
+        environment = environment or {}
+        # available keys for runtime environment are listed here:
+        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
+        environment_keys = {
+            "numWorkers",
+            "maxWorkers",
+            "zone",
+            "serviceAccountEmail",
+            "tempLocation",
+            "bypassTempDirValidation",
+            "machineType",
+            "additionalExperiments",
+            "network",
+            "subnetwork",
+            "additionalUserLabels",
+            "kmsKeyName",
+            "ipConfiguration",
+            "workerRegion",
+            "workerZone",
+        }
+
+        def _check_one(key, val):
+            if key in environment:
+                self.log.warning(
+                    "%r parameter in 'variables' will override of " "the same one passed in 'environment'!",

Review Comment:
   ```suggestion
                       "%r parameter in 'variables' will override the same one passed in 'environment'!",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090486887


##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
   - google-cloud-build>=3.0.0
   - google-cloud-compute>=0.1.0,<2.0.0
   - google-cloud-container>=2.2.0,<3.0.0
+  - google-cloud-dataflow-client>=0.5.2,<0.5.5

Review Comment:
   Can you please add a comment there why ? I know it's not there for the others but we have generally adopted the approach   that whenever we upper-bound a dependency we SHOULD add a comment specifiying why the upper-binding is there and what is the condition for it to be removed. 
   
   https://github.com/apache/airflow#approach-to-dependencies-of-airflow :
   
   > Whenever we upper-bound such a dependency, we should always comment why we are doing it - i.e. we should have a good reason why dependency is upper-bound. And we should also mention what is the condition to remove the binding.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1026399790


##########
airflow/providers/google/cloud/example_dags/example_dataflow.py:
##########
@@ -52,6 +52,7 @@
 GCS_OUTPUT = os.environ.get("GCP_DATAFLOW_GCS_OUTPUT", "gs://INVALID BUCKET NAME/output")

Review Comment:
   Those files (in google/cloud/example_dags/) are meant to be migrated to the system tests and then removed. Can you verify if dataflow tests are already migrated and if so modify the migrated file and remove those ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] brucearctor commented on pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
brucearctor commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1382043117

   @MrGeorgeOwl -- this is great and we are eagerly awaiting this functionality to save us some headaches!  Do you want some help getting it over the finish line?  Looks like the outstanding reviews are pretty straightforward :-)
   
   Please advise.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1080875455


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1037,7 +1050,7 @@ def start_sql_job(
     def get_job(
         self,
         job_id: str,
-        project_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,

Review Comment:
   Is this related to other changes or just a drive-by improvement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090556702


##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
   - google-cloud-build>=3.0.0
   - google-cloud-compute>=0.1.0,<2.0.0
   - google-cloud-container>=2.2.0,<3.0.0
+  - google-cloud-dataflow-client>=0.5.2,<0.5.5

Review Comment:
   @potiuk, is comment in provider.yaml will be enough or project has another place for upper-bound comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054901004


##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -669,18 +681,63 @@ def set_current_job(current_job):
             environment=self.environment,
             append_job_name=self.append_job_name,
         )
+        job_id = self.job.get("id")
 
-        return job
+        if job_id is None:
+            raise AirflowException(
+                "While reading job object after template execution error occurred. Job object has no id."
+            )
+
+        if not self.deferrable:
+            return job_id
+
+        context["ti"].xcom_push(key="job_id", value=job_id)
+
+        self.defer(
+            trigger=TemplateJobStartTrigger(
+                project_id=self.project_id,
+                job_id=job_id,
+                location=self.location,
+                gcp_conn_id=self.gcp_conn_id,
+                delegate_to=self.delegate_to,
+                poll_sleep=self.poll_sleep,
+                impersonation_chain=self.impersonation_chain,
+                cancel_timeout=self.cancel_timeout,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any]):
+        """Method which executes after trigger finishes its work."""
+        if event["status"] == "error" or event["status"] == "stopped":
+            self.log.info("status: %s, msg: %s", event["status"], event["message"])
+            raise AirflowException(event["message"])
+
+        self.log.info("Task %s completed with response %s", self.task_id, event["message"])
 
     def on_kill(self) -> None:
         self.log.info("On kill.")
-        if self.job:
-            self.hook.cancel_job(
+        if self.job is not None:
+            self.log.info("Cancel job %s", self.job_name)
+            self._get_hook().cancel_job(
+                job_name=self.job_name,
                 job_id=self.job.get("id"),
                 project_id=self.job.get("projectId"),
                 location=self.job.get("location"),
             )
 
+    def _get_hook(self) -> DataflowHook:

Review Comment:
   instead you could just make `hook` a  `@cached_property`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054898823


##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -634,31 +639,38 @@ def __init__(
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
         self.poll_sleep = poll_sleep
-        self.job = None
-        self.hook: DataflowHook | None = None
         self.impersonation_chain = impersonation_chain
         self.environment = environment
         self.cancel_timeout = cancel_timeout
         self.wait_until_finished = wait_until_finished
         self.append_job_name = append_job_name
+        self.deferrable = deferrable
 
-    def execute(self, context: Context) -> dict:
-        self.hook = DataflowHook(
-            gcp_conn_id=self.gcp_conn_id,
-            delegate_to=self.delegate_to,
-            poll_sleep=self.poll_sleep,
-            impersonation_chain=self.impersonation_chain,
-            cancel_timeout=self.cancel_timeout,
-            wait_until_finished=self.wait_until_finished,
-        )
+        self.job: dict | None = None
+        self._hook: DataflowHook | None = None

Review Comment:
   If users are inheriting this operator and doing things with `self.hook` it will break their code. Please change it to `self.hook`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090333370


##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
   - google-cloud-build>=3.0.0
   - google-cloud-compute>=0.1.0,<2.0.0
   - google-cloud-container>=2.2.0,<3.0.0
+  - google-cloud-dataflow-client>=0.5.2,<0.5.5

Review Comment:
   Any reason this version range is so specific? What happens in 0.5.5?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] brucearctor commented on pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
brucearctor commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1385689773

   Awesome - thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1387282514

   Sorry everyone who was added to this PR as approver. I am not sure why that happened after rebasing to main branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1083733347


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1037,7 +1050,7 @@ def start_sql_job(
     def get_job(
         self,
         job_id: str,
-        project_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,

Review Comment:
   Drive-by improvement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054902161


##########
airflow/providers/google/provider.yaml:
##########
@@ -83,6 +83,7 @@ dependencies:
   - google-cloud-build>=3.0.0
   - google-cloud-compute>=0.1.0,<2.0.0
   - google-cloud-container>=2.2.0,<3.0.0
+  - google-cloud-dataflow-client==0.5.4

Review Comment:
   Can you not pin it to a specific version please, and instead provide a range or just min version ideally.
   
   Airflow is a library as well as app so we want to keep deps as open as possible.



##########
generated/provider_dependencies.json:
##########
@@ -319,6 +319,7 @@
       "google-cloud-compute>=0.1.0,<2.0.0",
       "google-cloud-container>=2.2.0,<3.0.0",
       "google-cloud-datacatalog>=3.0.0",
+      "google-cloud-dataflow-client==0.5.4",

Review Comment:
   same as https://github.com/apache/airflow/pull/27776/files#r1054902161



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1357504953

   I think there were intermittent/flaky tests - I re-run them to be sure. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054901185


##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -755,13 +811,14 @@ def __init__(
         self,
         body: dict,
         location: str,
-        project_id: str | None = None,
+        project_id: str,

Review Comment:
   Why make this a required argument?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1080875713


##########
airflow/providers/google/cloud/links/dataflow.py:
##########
@@ -48,5 +48,5 @@ def persist(
         operator_instance.xcom_push(
             context,
             key=DataflowJobLink.key,
-            value={"project_id": project_id, "location": region, "job_id": job_id},
+            value={"project_id": project_id, "region": region, "job_id": job_id},

Review Comment:
   Same question



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1080876447


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -719,6 +698,40 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    def _update_environment(self, variables: dict, environment: dict | None = None) -> dict:
+        environment = environment or {}
+        # available keys for runtime environment are listed here:
+        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
+        environment_keys = [
+            "numWorkers",
+            "maxWorkers",
+            "zone",
+            "serviceAccountEmail",
+            "tempLocation",
+            "bypassTempDirValidation",
+            "machineType",
+            "additionalExperiments",
+            "network",
+            "subnetwork",
+            "additionalUserLabels",
+            "kmsKeyName",
+            "ipConfiguration",
+            "workerRegion",
+            "workerZone",
+        ]

Review Comment:
   ```suggestion
           environment_keys = {
               "numWorkers",
               "maxWorkers",
               "zone",
               "serviceAccountEmail",
               "tempLocation",
               "bypassTempDirValidation",
               "machineType",
               "additionalExperiments",
               "network",
               "subnetwork",
               "additionalUserLabels",
               "kmsKeyName",
               "ipConfiguration",
               "workerRegion",
               "workerZone",
           }
   ```
   
   Faster for `in` checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators

Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1083736807


##########
airflow/providers/google/cloud/links/dataflow.py:
##########
@@ -48,5 +48,5 @@ def persist(
         operator_instance.xcom_push(
             context,
             key=DataflowJobLink.key,
-            value={"project_id": project_id, "location": region, "job_id": job_id},
+            value={"project_id": project_id, "region": region, "job_id": job_id},

Review Comment:
   Not improvement but fixing the link work, without that change link for Dataflow won't work cause of KeyError exception if I remember correctly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on pull request #27776: Add deferrable mode to dataflow operators

Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1408658708

   @potiuk, I've added comment about upper-bound for google library


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org