You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/23 18:01:42 UTC

[GitHub] [airflow] mik-laj opened a new pull request #8531: Support all RuntimeEnviironmnet paramaeters in DataflowTemplatedJobStartOperator

mik-laj opened a new pull request #8531:
URL: https://github.com/apache/airflow/pull/8531


   Fix: https://github.com/apache/airflow/issues/8300
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #8531: Support all RuntimeEnviironmnet paramaeters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#issuecomment-619477858


   @jaketf  Can I ask for review? I know that you are also interested in integration with Dataflow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416140436



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -548,23 +554,17 @@ def start_template_dataflow(
         :type location: str
         """
         name = self._build_dataflow_job_name(job_name, append_job_name)
-        # Builds RuntimeEnvironment from variables dictionary
-        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-        environment = {}
-        for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-                    'tempLocation', 'bypassTempDirValidation', 'machineType',
-                    'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']:
-            if key in variables:
-                environment.update({key: variables[key]})
-        body = {"jobName": name,
-                "parameters": parameters,
-                "environment": environment}
+
         service = self.get_conn()
         request = service.projects().locations().templates().launch(  # pylint: disable=no-member
             projectId=project_id,
             location=location,
             gcsPath=dataflow_template,
-            body=body
+            body={
+                "jobName": name,
+                "parameters": parameters,
+                "environment": variables

Review comment:
       What is the difference between parameters and variables?

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -377,6 +394,7 @@ def __init__(
         self.poll_sleep = poll_sleep
         self.job_id = None
         self.hook: Optional[DataflowHook] = None
+        self.options.update(dataflow_default_options)

Review comment:
       Would not this override user provided options with default options?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -532,7 +532,13 @@ def start_template_dataflow(
 
         :param job_name: The name of the job.

Review comment:
       Is this bug ( #8300) only affecting templates?

##########
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##########
@@ -52,32 +54,32 @@
     'stagingLocation': 'gs://test/staging',
     'labels': {'foo': 'bar'}
 }
-DATAFLOW_VARIABLES_TEMPLATE = {
-    'project': 'test',
-    'tempLocation': 'gs://test/temp',
-    'zone': 'us-central1-f'
-}
 RUNTIME_ENV = {
-    'tempLocation': 'gs://test/temp',
-    'zone': 'us-central1-f',
-    'numWorkers': 2,
-    'maxWorkers': 10,
-    'serviceAccountEmail': 'test@apache.airflow',
-    'machineType': 'n1-standard-1',
     'additionalExperiments': ['exp_flag1', 'exp_flag2'],
-    'network': 'default',
-    'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK',
     'additionalUserLabels': {
         'name': 'wrench',
         'mass': '1.3kg',
         'count': '3'
-    }
+    },
+    'bypassTempDirValidation': {},
+    'ipConfiguration': 'WORKER_IP_PRIVATE',
+    'kmsKeyName': (
+        'projects/TEST_PROJECT_ID/locations/TEST_LOCATIONS/keyRings/TEST_KEYRING/cryptoKeys/TEST_CRYPTOKEYS'
+    ),
+    'maxWorkers': 10,
+    'network': 'default',
+    'numWorkers': 2,
+    'serviceAccountEmail': 'test@apache.airflow',
+    'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK',
+    'tempLocation': 'gs://test/temp',
+    'workerRegion': "test-region",
+    'workerZone': 'test-zone',
+    'zone': 'us-central1-f',
+    'machineType': 'n1-standard-1',

Review comment:
       maybe test private ip flag, since it was specifically mentioned in the issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416178253



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -548,23 +554,17 @@ def start_template_dataflow(
         :type location: str
         """
         name = self._build_dataflow_job_name(job_name, append_job_name)
-        # Builds RuntimeEnvironment from variables dictionary
-        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-        environment = {}
-        for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-                    'tempLocation', 'bypassTempDirValidation', 'machineType',
-                    'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']:
-            if key in variables:
-                environment.update({key: variables[key]})
-        body = {"jobName": name,
-                "parameters": parameters,
-                "environment": environment}
+
         service = self.get_conn()
         request = service.projects().locations().templates().launch(  # pylint: disable=no-member
             projectId=project_id,
             location=location,
             gcsPath=dataflow_template,
-            body=body
+            body={
+                "jobName": name,
+                "parameters": parameters,
+                "environment": variables

Review comment:
       got it. thank you for cleaning things, and keeping them backward compatible.
   
   How about adding a new clean flag and marking the old one as depracated?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -548,23 +554,17 @@ def start_template_dataflow(
         :type location: str
         """
         name = self._build_dataflow_job_name(job_name, append_job_name)
-        # Builds RuntimeEnvironment from variables dictionary
-        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-        environment = {}
-        for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-                    'tempLocation', 'bypassTempDirValidation', 'machineType',
-                    'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']:
-            if key in variables:
-                environment.update({key: variables[key]})
-        body = {"jobName": name,
-                "parameters": parameters,
-                "environment": environment}
+
         service = self.get_conn()
         request = service.projects().locations().templates().launch(  # pylint: disable=no-member
             projectId=project_id,
             location=location,
             gcsPath=dataflow_template,
-            body=body
+            body={
+                "jobName": name,
+                "parameters": parameters,
+                "environment": variables

Review comment:
       got it. thank you for cleaning things, and keeping them backward compatible.
   
   How about adding a new clean flag and marking the old one as deprecated?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416171580



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -532,7 +532,13 @@ def start_template_dataflow(
 
         :param job_name: The name of the job.

Review comment:
       Yes. It only affects templates.  Native pipelines were not affected by this issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #8531: Support all RuntimeEnviironmnet paramaeters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#issuecomment-619477450


   @sgringwe  Can I ask for review?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416172704



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -548,23 +554,17 @@ def start_template_dataflow(
         :type location: str
         """
         name = self._build_dataflow_job_name(job_name, append_job_name)
-        # Builds RuntimeEnvironment from variables dictionary
-        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-        environment = {}
-        for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-                    'tempLocation', 'bypassTempDirValidation', 'machineType',
-                    'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']:
-            if key in variables:
-                environment.update({key: variables[key]})
-        body = {"jobName": name,
-                "parameters": parameters,
-                "environment": environment}
+
         service = self.get_conn()
         request = service.projects().locations().templates().launch(  # pylint: disable=no-member
             projectId=project_id,
             location=location,
             gcsPath=dataflow_template,
-            body=body
+            body={
+                "jobName": name,
+                "parameters": parameters,
+                "environment": variables

Review comment:
       This is problematic, but someone once created a parameter and called `variables`.  He used the same name elsewhere. Now I am trying to maintain backward compatibility. In the next PR I will try to change the names of the arguments, but this will require more work to maintain backward compatibility. 
   Formerly `variables` parameters contain environments + project id + region. It was similar to the native pipeline, where the region was also passed as an argument. `python pipeline.py --region=europe-west-1`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416195651



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -548,23 +554,17 @@ def start_template_dataflow(
         :type location: str
         """
         name = self._build_dataflow_job_name(job_name, append_job_name)
-        # Builds RuntimeEnvironment from variables dictionary
-        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-        environment = {}
-        for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-                    'tempLocation', 'bypassTempDirValidation', 'machineType',
-                    'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']:
-            if key in variables:
-                environment.update({key: variables[key]})
-        body = {"jobName": name,
-                "parameters": parameters,
-                "environment": environment}
+
         service = self.get_conn()
         request = service.projects().locations().templates().launch(  # pylint: disable=no-member
             projectId=project_id,
             location=location,
             gcsPath=dataflow_template,
-            body=body
+            body={
+                "jobName": name,
+                "parameters": parameters,
+                "environment": variables

Review comment:
       I want to change names for more parameters in many places. I will probably do it with a special decorator that will allow me to do it cleverly.
   ```
       @rename_parameter({'variables': 'environment'})
       def start_template_dataflow(
           self,
           job_name: str,
           variables: Dict,
           parameters: Dict,
           dataflow_template: str,
           project_id: str,
           append_job_name: bool = True,
           on_new_job_id_callback: Optional[Callable[[str], None]] = None,
           location: str = DEFAULT_DATAFLOW_LOCATION
       ) -> Dict:
   ```
   That way, we won't have so much outdated code in the repository.
   
   This change from a perspective will be much more problematic and I would like to test it in various IDEs, etc
   ![Screenshot 2020-04-28 at 00 31 57](https://user-images.githubusercontent.com/12058428/80427302-bee93980-88e7-11ea-97bb-a8b0bdbec6f2.png)
   I would like to get similar messages when the user updates the operator.
   
   It looks simple but still needs testing and I will do it in a separate PR.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416175888



##########
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##########
@@ -52,32 +54,32 @@
     'stagingLocation': 'gs://test/staging',
     'labels': {'foo': 'bar'}
 }
-DATAFLOW_VARIABLES_TEMPLATE = {
-    'project': 'test',
-    'tempLocation': 'gs://test/temp',
-    'zone': 'us-central1-f'
-}
 RUNTIME_ENV = {
-    'tempLocation': 'gs://test/temp',
-    'zone': 'us-central1-f',
-    'numWorkers': 2,
-    'maxWorkers': 10,
-    'serviceAccountEmail': 'test@apache.airflow',
-    'machineType': 'n1-standard-1',
     'additionalExperiments': ['exp_flag1', 'exp_flag2'],
-    'network': 'default',
-    'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK',
     'additionalUserLabels': {
         'name': 'wrench',
         'mass': '1.3kg',
         'count': '3'
-    }
+    },
+    'bypassTempDirValidation': {},
+    'ipConfiguration': 'WORKER_IP_PRIVATE',
+    'kmsKeyName': (
+        'projects/TEST_PROJECT_ID/locations/TEST_LOCATIONS/keyRings/TEST_KEYRING/cryptoKeys/TEST_CRYPTOKEYS'
+    ),
+    'maxWorkers': 10,
+    'network': 'default',
+    'numWorkers': 2,
+    'serviceAccountEmail': 'test@apache.airflow',
+    'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK',
+    'tempLocation': 'gs://test/temp',
+    'workerRegion': "test-region",
+    'workerZone': 'test-zone',
+    'zone': 'us-central1-f',
+    'machineType': 'n1-standard-1',

Review comment:
       IP Configuration is also tested on line 65. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416176087



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -377,6 +394,7 @@ def __init__(
         self.poll_sleep = poll_sleep
         self.job_id = None
         self.hook: Optional[DataflowHook] = None
+        self.options.update(dataflow_default_options)

Review comment:
       Good point. I will fix it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416172704



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -548,23 +554,17 @@ def start_template_dataflow(
         :type location: str
         """
         name = self._build_dataflow_job_name(job_name, append_job_name)
-        # Builds RuntimeEnvironment from variables dictionary
-        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-        environment = {}
-        for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-                    'tempLocation', 'bypassTempDirValidation', 'machineType',
-                    'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']:
-            if key in variables:
-                environment.update({key: variables[key]})
-        body = {"jobName": name,
-                "parameters": parameters,
-                "environment": environment}
+
         service = self.get_conn()
         request = service.projects().locations().templates().launch(  # pylint: disable=no-member
             projectId=project_id,
             location=location,
             gcsPath=dataflow_template,
-            body=body
+            body={
+                "jobName": name,
+                "parameters": parameters,
+                "environment": variables

Review comment:
       This is problematic, but someone once created a parameter and called it variables. He used the same name elsewhere. Now I am trying to maintain backward compatibility. In the next PR I will try to correct the names of the arguments, but this will require more work to maintain backward compatibility.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416172704



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -548,23 +554,17 @@ def start_template_dataflow(
         :type location: str
         """
         name = self._build_dataflow_job_name(job_name, append_job_name)
-        # Builds RuntimeEnvironment from variables dictionary
-        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-        environment = {}
-        for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-                    'tempLocation', 'bypassTempDirValidation', 'machineType',
-                    'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']:
-            if key in variables:
-                environment.update({key: variables[key]})
-        body = {"jobName": name,
-                "parameters": parameters,
-                "environment": environment}
+
         service = self.get_conn()
         request = service.projects().locations().templates().launch(  # pylint: disable=no-member
             projectId=project_id,
             location=location,
             gcsPath=dataflow_template,
-            body=body
+            body={
+                "jobName": name,
+                "parameters": parameters,
+                "environment": variables

Review comment:
       This is problematic, but someone once created a parameter and called `variables`.  He used the same name elsewhere. Now I am trying to maintain backward compatibility. In the next PR I will try to change the names of the arguments, but this will require more work to maintain backward compatibility. 
   
   Formerly `variables` parameters contain environments + project id + region. It was similar to the native pipeline, where the region was also passed as an argument. `python pipeline.py --region=europe-west-1`.  However, this was changed when I introduced the changes required by the [GCP guidelines](https://docs.google.com/document/d/1_rTdJSLCt0eyrAylmmgYc3yZr-_h51fVlnvMmWqhCkY/edit).  Now it contains only environment parameters 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org