You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/11/18 12:37:03 UTC
[GitHub] [airflow] MrGeorgeOwl opened a new pull request, #27776: Add deferrable mode to dataflow operators
MrGeorgeOwl opened a new pull request, #27776:
URL: https://github.com/apache/airflow/pull/27776
…StartFlexTemplateOperator operators
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054897101
##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -719,6 +691,40 @@ def start_template_dataflow(
jobs_controller.wait_for_done()
return response["job"]
+ def _update_environment(self, environment: dict | None, variables: dict) -> dict:
Review Comment:
if `environment` accepts `None`, why not add a default of `None`?
```suggestion
def _update_environment(self, variables: dict, environment: dict | None = None) -> dict:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054898229
##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -621,9 +624,11 @@ def __init__(
cancel_timeout: int | None = 10 * 60,
wait_until_finished: bool | None = None,
append_job_name: bool = True,
+ deferrable: bool = False,
Review Comment:
Can you add this to docstring, please
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] MrGeorgeOwl commented on pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1385629063
@brucearctor, thank you for your offer of help I appreciate that. I finished required changes at the moment so PR should be merged after it will be reviewed and approved
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1080874876
##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -719,6 +698,40 @@ def start_template_dataflow(
jobs_controller.wait_for_done()
return response["job"]
+ def _update_environment(self, variables: dict, environment: dict | None = None) -> dict:
+ environment = environment or {}
+ # available keys for runtime environment are listed here:
+ # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
+ environment_keys = [
+ "numWorkers",
+ "maxWorkers",
+ "zone",
+ "serviceAccountEmail",
+ "tempLocation",
+ "bypassTempDirValidation",
+ "machineType",
+ "additionalExperiments",
+ "network",
+ "subnetwork",
+ "additionalUserLabels",
+ "kmsKeyName",
+ "ipConfiguration",
+ "workerRegion",
+ "workerZone",
+ ]
+
+ for key in variables:
+ if key in environment_keys:
+ if key in environment:
+ self.log.warning(
+ "'%s' parameter in 'variables' will override of "
+ "the same one passed in 'environment'!",
+ key,
+ )
+ environment.update({key: variables[key]})
Review Comment:
```suggestion
def _check_one(key, val):
if key in environment:
self.log.warning(
"%r parameter in 'variables' will override of "
"the same one passed in 'environment'!",
key,
)
return key, val
environment.update(
_check_one(key, val)
for key, val in variables.items()
if key in environment_keys
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk merged pull request #27776: Add deferrable mode to dataflow operators
Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #27776:
URL: https://github.com/apache/airflow/pull/27776
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] MrGeorgeOwl commented on pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1357370465
@potiuk, tests that are failing at the moment are those that comes with changes from main branch. Have the same issues in several PRs so I think that changes in PR are okay but I might be wrong
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090347021
##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
- google-cloud-build>=3.0.0
- google-cloud-compute>=0.1.0,<2.0.0
- google-cloud-container>=2.2.0,<3.0.0
+ - google-cloud-dataflow-client>=0.5.2,<0.5.5
Review Comment:
Typical Google. Thanks for the clarification.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1336255942
Tests failing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090334794
##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
- google-cloud-build>=3.0.0
- google-cloud-compute>=0.1.0,<2.0.0
- google-cloud-container>=2.2.0,<3.0.0
+ - google-cloud-dataflow-client>=0.5.2,<0.5.5
Review Comment:
Version 0.5.5 requires higher versions of protobuf and proto-plus libraries which can break other dependencies in google provider package
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090600031
##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
- google-cloud-build>=3.0.0
- google-cloud-compute>=0.1.0,<2.0.0
- google-cloud-container>=2.2.0,<3.0.0
+ - google-cloud-dataflow-client>=0.5.2,<0.5.5
Review Comment:
Yep. Comment in provider.yaml is exactly what I meant
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054900437
##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -669,18 +681,63 @@ def set_current_job(current_job):
environment=self.environment,
append_job_name=self.append_job_name,
)
+ job_id = self.job.get("id")
- return job
+ if job_id is None:
+ raise AirflowException(
+ "While reading job object after template execution error occurred. Job object has no id."
+ )
+
+ if not self.deferrable:
+ return job_id
+
+ context["ti"].xcom_push(key="job_id", value=job_id)
+
+ self.defer(
+ trigger=TemplateJobStartTrigger(
+ project_id=self.project_id,
+ job_id=job_id,
+ location=self.location,
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ poll_sleep=self.poll_sleep,
+ impersonation_chain=self.impersonation_chain,
+ cancel_timeout=self.cancel_timeout,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, Any]):
+ """Method which executes after trigger finishes its work."""
+ if event["status"] == "error" or event["status"] == "stopped":
+ self.log.info("status: %s, msg: %s", event["status"], event["message"])
+ raise AirflowException(event["message"])
+
+ self.log.info("Task %s completed with response %s", self.task_id, event["message"])
def on_kill(self) -> None:
self.log.info("On kill.")
- if self.job:
- self.hook.cancel_job(
+ if self.job is not None:
+ self.log.info("Cancel job %s", self.job_name)
Review Comment:
```suggestion
self.log.info("Cancelling job %s", self.job_name)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1029407915
##########
airflow/providers/google/cloud/example_dags/example_dataflow.py:
##########
@@ -52,6 +52,7 @@
GCS_OUTPUT = os.environ.get("GCP_DATAFLOW_GCS_OUTPUT", "gs://INVALID BUCKET NAME/output")
Review Comment:
System tests was migrated but at the moment I couldn't do required changes even after rebase of the branch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090331638
##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -723,6 +702,40 @@ def start_template_dataflow(
jobs_controller.wait_for_done()
return response["job"]
+ def _update_environment(self, variables: dict, environment: dict | None = None) -> dict:
+ environment = environment or {}
+ # available keys for runtime environment are listed here:
+ # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
+ environment_keys = {
+ "numWorkers",
+ "maxWorkers",
+ "zone",
+ "serviceAccountEmail",
+ "tempLocation",
+ "bypassTempDirValidation",
+ "machineType",
+ "additionalExperiments",
+ "network",
+ "subnetwork",
+ "additionalUserLabels",
+ "kmsKeyName",
+ "ipConfiguration",
+ "workerRegion",
+ "workerZone",
+ }
+
+ def _check_one(key, val):
+ if key in environment:
+ self.log.warning(
+ "%r parameter in 'variables' will override of " "the same one passed in 'environment'!",
Review Comment:
```suggestion
"%r parameter in 'variables' will override the same one passed in 'environment'!",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090486887
##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
- google-cloud-build>=3.0.0
- google-cloud-compute>=0.1.0,<2.0.0
- google-cloud-container>=2.2.0,<3.0.0
+ - google-cloud-dataflow-client>=0.5.2,<0.5.5
Review Comment:
Can you please add a comment there why ? I know it's not there for the others but we have generally adopted the approach that whenever we upper-bound a dependency we SHOULD add a comment specifiying why the upper-binding is there and what is the condition for it to be removed.
https://github.com/apache/airflow#approach-to-dependencies-of-airflow :
> Whenever we upper-bound such a dependency, we should always comment why we are doing it - i.e. we should have a good reason why dependency is upper-bound. And we should also mention what is the condition to remove the binding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] bhirsz commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1026399790
##########
airflow/providers/google/cloud/example_dags/example_dataflow.py:
##########
@@ -52,6 +52,7 @@
GCS_OUTPUT = os.environ.get("GCP_DATAFLOW_GCS_OUTPUT", "gs://INVALID BUCKET NAME/output")
Review Comment:
Those files (in google/cloud/example_dags/) are meant to be migrated to the system tests and then removed. Can you verify if dataflow tests are already migrated and if so modify the migrated file and remove those ones?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] brucearctor commented on pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
brucearctor commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1382043117
@MrGeorgeOwl -- this is great and we are eagerly awaiting this functionality to save us some headaches! Do you want some help getting it over the finish line? Looks like the outstanding reviews are pretty straightforward :-)
Please advise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1080875455
##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1037,7 +1050,7 @@ def start_sql_job(
def get_job(
self,
job_id: str,
- project_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
Review Comment:
Is this related to other changes or just a drive-by improvement?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090556702
##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
- google-cloud-build>=3.0.0
- google-cloud-compute>=0.1.0,<2.0.0
- google-cloud-container>=2.2.0,<3.0.0
+ - google-cloud-dataflow-client>=0.5.2,<0.5.5
Review Comment:
@potiuk, is comment in provider.yaml will be enough or project has another place for upper-bound comments?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054901004
##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -669,18 +681,63 @@ def set_current_job(current_job):
environment=self.environment,
append_job_name=self.append_job_name,
)
+ job_id = self.job.get("id")
- return job
+ if job_id is None:
+ raise AirflowException(
+ "While reading job object after template execution error occurred. Job object has no id."
+ )
+
+ if not self.deferrable:
+ return job_id
+
+ context["ti"].xcom_push(key="job_id", value=job_id)
+
+ self.defer(
+ trigger=TemplateJobStartTrigger(
+ project_id=self.project_id,
+ job_id=job_id,
+ location=self.location,
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ poll_sleep=self.poll_sleep,
+ impersonation_chain=self.impersonation_chain,
+ cancel_timeout=self.cancel_timeout,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, Any]):
+ """Method which executes after trigger finishes its work."""
+ if event["status"] == "error" or event["status"] == "stopped":
+ self.log.info("status: %s, msg: %s", event["status"], event["message"])
+ raise AirflowException(event["message"])
+
+ self.log.info("Task %s completed with response %s", self.task_id, event["message"])
def on_kill(self) -> None:
self.log.info("On kill.")
- if self.job:
- self.hook.cancel_job(
+ if self.job is not None:
+ self.log.info("Cancel job %s", self.job_name)
+ self._get_hook().cancel_job(
+ job_name=self.job_name,
job_id=self.job.get("id"),
project_id=self.job.get("projectId"),
location=self.job.get("location"),
)
+ def _get_hook(self) -> DataflowHook:
Review Comment:
instead you could just make `hook` a `@cached_property`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054898823
##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -634,31 +639,38 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.poll_sleep = poll_sleep
- self.job = None
- self.hook: DataflowHook | None = None
self.impersonation_chain = impersonation_chain
self.environment = environment
self.cancel_timeout = cancel_timeout
self.wait_until_finished = wait_until_finished
self.append_job_name = append_job_name
+ self.deferrable = deferrable
- def execute(self, context: Context) -> dict:
- self.hook = DataflowHook(
- gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to,
- poll_sleep=self.poll_sleep,
- impersonation_chain=self.impersonation_chain,
- cancel_timeout=self.cancel_timeout,
- wait_until_finished=self.wait_until_finished,
- )
+ self.job: dict | None = None
+ self._hook: DataflowHook | None = None
Review Comment:
If users are inheriting this operator and doing things with `self.hook` it will break their code. Please change it to `self.hook`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1090333370
##########
airflow/providers/google/provider.yaml:
##########
@@ -85,6 +85,7 @@ dependencies:
- google-cloud-build>=3.0.0
- google-cloud-compute>=0.1.0,<2.0.0
- google-cloud-container>=2.2.0,<3.0.0
+ - google-cloud-dataflow-client>=0.5.2,<0.5.5
Review Comment:
Any reason this version range is so specific? What happens in 0.5.5?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] brucearctor commented on pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
brucearctor commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1385689773
Awesome - thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] MrGeorgeOwl commented on pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1387282514
Sorry everyone who was added to this PR as approver. I am not sure why that happened after rebasing to main branch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1083733347
##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1037,7 +1050,7 @@ def start_sql_job(
def get_job(
self,
job_id: str,
- project_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
Review Comment:
Drive-by improvement
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054902161
##########
airflow/providers/google/provider.yaml:
##########
@@ -83,6 +83,7 @@ dependencies:
- google-cloud-build>=3.0.0
- google-cloud-compute>=0.1.0,<2.0.0
- google-cloud-container>=2.2.0,<3.0.0
+ - google-cloud-dataflow-client==0.5.4
Review Comment:
Can you not pin it to a specific version please, and instead provide a range or just min version ideally.
Airflow is a library as well as app so we want to keep deps as open as possible.
##########
generated/provider_dependencies.json:
##########
@@ -319,6 +319,7 @@
"google-cloud-compute>=0.1.0,<2.0.0",
"google-cloud-container>=2.2.0,<3.0.0",
"google-cloud-datacatalog>=3.0.0",
+ "google-cloud-dataflow-client==0.5.4",
Review Comment:
same as https://github.com/apache/airflow/pull/27776/files#r1054902161
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1357504953
I think there were intermittent/flaky tests - I re-run them to be sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1054901185
##########
airflow/providers/google/cloud/operators/dataflow.py:
##########
@@ -755,13 +811,14 @@ def __init__(
self,
body: dict,
location: str,
- project_id: str | None = None,
+ project_id: str,
Review Comment:
Why make this a required argument?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1080875713
##########
airflow/providers/google/cloud/links/dataflow.py:
##########
@@ -48,5 +48,5 @@ def persist(
operator_instance.xcom_push(
context,
key=DataflowJobLink.key,
- value={"project_id": project_id, "location": region, "job_id": job_id},
+ value={"project_id": project_id, "region": region, "job_id": job_id},
Review Comment:
Same question
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1080876447
##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -719,6 +698,40 @@ def start_template_dataflow(
jobs_controller.wait_for_done()
return response["job"]
+ def _update_environment(self, variables: dict, environment: dict | None = None) -> dict:
+ environment = environment or {}
+ # available keys for runtime environment are listed here:
+ # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
+ environment_keys = [
+ "numWorkers",
+ "maxWorkers",
+ "zone",
+ "serviceAccountEmail",
+ "tempLocation",
+ "bypassTempDirValidation",
+ "machineType",
+ "additionalExperiments",
+ "network",
+ "subnetwork",
+ "additionalUserLabels",
+ "kmsKeyName",
+ "ipConfiguration",
+ "workerRegion",
+ "workerZone",
+ ]
Review Comment:
```suggestion
environment_keys = {
"numWorkers",
"maxWorkers",
"zone",
"serviceAccountEmail",
"tempLocation",
"bypassTempDirValidation",
"machineType",
"additionalExperiments",
"network",
"subnetwork",
"additionalUserLabels",
"kmsKeyName",
"ipConfiguration",
"workerRegion",
"workerZone",
}
```
Faster for `in` checks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #27776: Add deferrable mode to dataflow operators
Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #27776:
URL: https://github.com/apache/airflow/pull/27776#discussion_r1083736807
##########
airflow/providers/google/cloud/links/dataflow.py:
##########
@@ -48,5 +48,5 @@ def persist(
operator_instance.xcom_push(
context,
key=DataflowJobLink.key,
- value={"project_id": project_id, "location": region, "job_id": job_id},
+ value={"project_id": project_id, "region": region, "job_id": job_id},
Review Comment:
Not improvement but fixing the link work, without that change link for Dataflow won't work cause of KeyError exception if I remember correctly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] MrGeorgeOwl commented on pull request #27776: Add deferrable mode to dataflow operators
Posted by "MrGeorgeOwl (via GitHub)" <gi...@apache.org>.
MrGeorgeOwl commented on PR #27776:
URL: https://github.com/apache/airflow/pull/27776#issuecomment-1408658708
@potiuk, I've added comment about upper-bound for google library
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org