You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/05 03:12:52 UTC

[GitHub] [airflow] terekete opened a new pull request #13478: Google Dataflow Flex Operator to handle no Job Type

terekete opened a new pull request #13478:
URL: https://github.com/apache/airflow/pull/13478


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #13262
   related: N/A
   
   -->
   Google Dataflow Flex Operator to handle no Job Type
   
   Dataflow job initially started has no Job Type details returnable to the  _check_dataflow_job_state. The code change handles this condition until the job type can be determined.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bitnahian commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
bitnahian commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-799953291


   Hi @TobKed, has this issue been resolved? Is it possible to contribute to this fix myself? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-830467817


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] terekete commented on a change in pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
terekete commented on a change in pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#discussion_r552784989



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -377,29 +377,32 @@ def _check_dataflow_job_state(self, job) -> bool:
         :rtype: bool
         :raise: Exception
         """
-        if self._wait_until_finished is None:
-            wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMING

Review comment:
       agree - this is a better way, have changed for review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-783182933


   > Hi, is there a temporary fix for batch jobs to not fail until this PR gets merged?
   
   You can create python files with correct code which will be available within your code base and import them with correct path. (PTAL at LEGB rule [e.g. here](https://realpython.com/python-scope-legb-rule/)).
   
   
   > 
   > I also notice a mismatch between File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321 and the actual code base where it's in line 381. Is this something to be worried about?
   
   Probably some changes were merged into master in comprarison to your installed version. Some time ago Beam operators were intoduced and some logic is handled by Beam provider now. In my opinion nothing to worry but if you have possibility please test your pipelines with the last version of google and beam providers.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-787473709


   @ aarshayj it makes sense :) Would you like to add fix? Remember to modify existing tests and / or add new one to avoid regression in the future. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#discussion_r551897070



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -377,29 +377,32 @@ def _check_dataflow_job_state(self, job) -> bool:
         :rtype: bool
         :raise: Exception
         """
-        if self._wait_until_finished is None:
-            wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMING

Review comment:
       what do you think about just using `wait_for_running = job.get('type', '') == DataflowJobType.JOB_TYPE_STREAMING` here?
   
   I am worried that skipping rest of the logic to wait for the job type may hang it forever.
   I can imagine situation where flex template job may be cancelled before state appear in payload.
   
   Another case is when job type is not necessary to return True:
   ```
               elif job['currentState'] in DataflowJobStatus.AWAITING_STATES:
                   return self._wait_until_finished is False
   ```
   
   cc @mik-laj 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aarshayj commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
aarshayj commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-786585822


   @TobKed - i'm facing the same issue. but i'm curious why the line of code is being executed in @bitnahian's code given that it is conditioned on an if statement:
   ```
   if self._wait_until_finished is None:
       wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMING
   else:
       wait_for_running = not self._wait_until_finished
   ```
   given that the value for `wait_until_finished` is being passed as True, the error should not pop up.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] terekete commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
terekete commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-769127221


   Will take a look at creating an additional test.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-951095419


   Thanks @akshayapte 
   
   I sse this [fix](https://github.com/apache/airflow/pull/14914)  for it was introduced in: [apache-airflow-providers-google in version 2.2.0 ](https://airflow.apache.org/docs/apache-airflow-providers-google/2.2.0/#bug-fixes). It is only difference with 2.1.0. 
   
   Question are:
   1.  whch version of google providers do you use and is it possible to update it without breaking changes?
   2. if answer for 2 is negative you can copy-paste hook code and apply fix ( copy-paste operator code as well and modify so it will use updated hook).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akshayapte commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
akshayapte commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-947585560


   Hello. I am still facing this issue. Is this merged yet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-757851351


   @terekete related tests: 
   * `tests.providers.google.cloud.hooks.test_dataflow.TestDataflowJob.test_check_dataflow_job_state_wait_until_finished`
   * `tests.providers.google.cloud.hooks.test_dataflow.TestDataflowJob.test_check_dataflow_job_state_terminal_state`
   
   To be considered are these tests may be updated or new one should be created.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aarshayj edited a comment on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
aarshayj edited a comment on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-787398700


   @TobKed 
   After following the code flow, I think I see what the major source of the issue is:
   - `DataflowStartFlexTemplateOperator`'s init has `wait_until_finished: Optional[bool] = None,` as an argument
   ```
   @apply_defaults
       def __init__(
           self,
           body: Dict,
           location: str,
           project_id: Optional[str] = None,
           gcp_conn_id: str = "google_cloud_default",
           delegate_to: Optional[str] = None,
           drain_pipeline: bool = False,
           cancel_timeout: Optional[int] = 10 * 60,
           wait_until_finished: Optional[bool] = None,
           *args,
           **kwargs,
       ) -> None:
           super().__init__(*args, **kwargs)
           self.body = body
           self.location = location
           self.project_id = project_id
           self.gcp_conn_id = gcp_conn_id
           self.delegate_to = delegate_to
           self.drain_pipeline = drain_pipeline
           self.cancel_timeout = cancel_timeout
           self.wait_until_finished = wait_until_finished
           self.job_id = None
           self.hook: Optional[DataflowHook] = None
   ```
   
   - The execute method of `DataflowStartFlexTemplateOperator` initializes `DataflowHook` 
   ```
       def execute(self, context):
           self.hook = DataflowHook(
               gcp_conn_id=self.gcp_conn_id,
               delegate_to=self.delegate_to,
               drain_pipeline=self.drain_pipeline,
               cancel_timeout=self.cancel_timeout,
               wait_until_finished=self.wait_until_finished,
           )
   ```
   - `DataflowHook`'s init sets the same property:
   ```
       def __init__(
           self,
           gcp_conn_id: str = "google_cloud_default",
           delegate_to: Optional[str] = None,
           poll_sleep: int = 10,
           impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
           drain_pipeline: bool = False,
           cancel_timeout: Optional[int] = 5 * 60,
           wait_until_finished: Optional[bool] = None,
       ) -> None:
           self.poll_sleep = poll_sleep
           self.drain_pipeline = drain_pipeline
           self.cancel_timeout = cancel_timeout
           self.wait_until_finished = wait_until_finished
           super().__init__(
               gcp_conn_id=gcp_conn_id,
               delegate_to=delegate_to,
               impersonation_chain=impersonation_chain,
           )
   ```
   - `DataflowHook` has methods: `def start_template_dataflow(..)` and `def start_flex_template(..)`, both of which initialize `_DataflowJobsController` as
   ```
   jobs_controller = _DataflowJobsController(
               dataflow=self.get_conn(),
               project_number=project_id,
               job_id=job_id,
               location=location,
               poll_sleep=self.poll_sleep,
               num_retries=self.num_retries,
               cancel_timeout=self.cancel_timeout,
           )
   ```
   The issue is that the value of `wait_until_finished` is not being passed to the `_DataflowJobsController` from `DataflowHook`. That's why this if block inside inside the controller code is True as the value is not being passed down.
   ```
   if self._wait_until_finished is None:
         wait_for_running = job.get('type', '') == DataflowJobType.JOB_TYPE_STREAMING
   ```
   
   If we fix just this, I think we should be able to solve the problem by passing the right value for `wait_until_finished`.
   
   Does this make sense?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akshayapte commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
akshayapte commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-950399576


   Hello @TobKed I have the same issue when I use the following code:
   `
   import datetime
   
   from airflow import models
   from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
   from airflow.utils.dates import days_ago
   
   # Define a DAG (directed acyclic graph) of tasks.
   # Any task you create within the context manager is automatically added to the
   # DAG object.
   with models.DAG(
       # The id you will see in the DAG airflow page
       "event_consolidator",
       # The interval with which to schedule the DAG
       schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
   ) as dag:
   
       start_template_job = DataflowStartFlexTemplateOperator(
       	wait_until_finished=True,
       	do_xcom_push=True,
   
           # The task id of your job
           task_id="dataflow_operator_event_consolidation",
           location="europe-west4",
         
       
       	body={
               "launchParameter": {
                   "containerSpecGcsPath": "gs://event-consol-templates-edge-default/event_consolidator/python_command_spec.json",
                   "jobName": "discover-event-consolidation-from-airflow",
                   "parameters": {
   			        "matched_event": '*********',
   			        "consolidated_event": 'gs://****/',
   			        "setup_file": '/dataflow/template/setup.py'
   		    	},
                   "environment": {
                       "subnetwork": 'https://www.googleapis.com/compute/v1/projects/***/regions/europe-west4/subnetworks/**********,
                       "machineType": "n1-standard-1",
                       "numWorkers": "1",
                       "maxWorkers": "1",
                       "tempLocation": "*******" + "/tmp/",
                   },
               }
           },
           project_id="trv-hs-src-disco-consol-edge"
       )
   
   
         File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 348, in wait_for_don
       while self._jobs and not all(self._check_dataflow_job_state(job) for job in self._jobs)
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 348, in <genexpr
       while self._jobs and not all(self._check_dataflow_job_state(job) for job in self._jobs)
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321, in _check_dataflow_job_stat
       wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
   KeyError: 'type'
   `
       
       


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-754361637


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, itโ€™s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better ๐Ÿš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-787176418


   @aarshayj I fhink problem may lay in in `DataflowHook.start_template_dataflow`. If I am not mistaken parameters passed to  `_DataflowJobsController` should contian `wait_until_finished = self.wait_until_finished`. Would you mind to check am I right? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bitnahian commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
bitnahian commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-780254830


   Hi, is there a temporary fix for batch jobs to not fail until this PR gets merged? I have the following DataflowFlexTemplate operator:
   
   ```python
   gcs_to_solr_transfer_body = {
       "launchParameter": {
           "containerSpecGcsPath": GCS_FLEX_TEMPLATE_TEMPLATE_PATH,
           "jobName": DATAFLOW_FLEX_TEMPLATE_JOB_NAME,
           "parameters": {
               "inputFile": FLEX_INPUT_FILE,
               "solrURL": FLEX_SOLR_URL
           },
           "environment": {
               "workerRegion": FLEX_TEMPLATE_LOCATION,
               "serviceAccountEmail": COMMON_SA,
               "maxWorkers": FLEX_MAX_NUM_WORKERS
           }
       }
   }
   
   start_flex_template = DataflowStartFlexTemplateOperator(
       task_id="start_gcs_to_solr",
       body=gcs_to_solr_transfer_body,
       do_xcom_push=True,
       location=FLEX_TEMPLATE_LOCATION,
       wait_until_finished=True
   )
   
   ```
   
   I'm receiving the following logs from my cloud composer dag:
   
   ```
   [2021-02-17 02:14:19,736] {dataflow.py:306} INFO - Google Cloud DataFlow job gcs-to-solr-20210217-021335 is state: JOB_STATE_QUEUED
   [2021-02-17 02:14:19,736] {taskinstance.py:1152} ERROR - 'type'
   Traceback (most recent call last)
     File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
       result = task_copy.execute(context=context
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/operators/dataflow.py", line 647, in execut
       on_new_job_id_callback=set_current_job_id
     File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 383, in inner_wrappe
       return func(self, *args, **kwargs
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 804, in start_flex_templat
       jobs_controller.wait_for_done(
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 348, in wait_for_don
       while self._jobs and not all(self._check_dataflow_job_state(job) for job in self._jobs)
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 348, in <genexpr
       while self._jobs and not all(self._check_dataflow_job_state(job) for job in self._jobs)
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321, in _check_dataflow_job_stat
       wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
   KeyError: 'type
   ```
   
   I also notice a mismatch between File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321 and the actual code base where it's in line 381. Is this something to be worried about? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akshayapte edited a comment on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
akshayapte edited a comment on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-950399576


   Hello @TobKed I have the same issue when I use the following code:
   `
   
   with models.DAG(
       # The id you will see in the DAG airflow page
       "event_consolidator",
       # The interval with which to schedule the DAG
       schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
   ) as dag:
   
       start_template_job = DataflowStartFlexTemplateOperator(
       	wait_until_finished=True,
       	do_xcom_push=True,
   
           # The task id of your job
           task_id="dataflow_operator_event_consolidation",
           location="europe-west4",
         
       
       	body={
               "launchParameter": {
                   "containerSpecGcsPath": "gs://***/python_command_spec.json",
                   "jobName": "discover-event-consolidation-from-airflow",
                   "parameters": {
   			        "matched_event": '*********',
   			        "consolidated_event": 'gs://****/',
   			        "setup_file": '/dataflow/template/setup.py'
   		    	},
                   "environment": {
                       "subnetwork": 'https://www.googleapis.com/compute/v1/projects/***/regions/europe-west4/subnetworks/**********,
                       "machineType": "n1-standard-1",
                       "numWorkers": "1",
                       "maxWorkers": "1",
                       "tempLocation": "*******" + "/tmp/",
                   },
               }
           },
           project_id="*******"
       )
   
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321, in _check_dataflow_job_stat
       wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
   KeyError: 'type'
   `
       
       


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-755738885


   > @TobKed - once the PR is merged, will this change appear in library: apache-airflow-backport-providers-google ?
   
   Not immediately, It has to be released. @potiuk is our Airflow providers specialist. @potiuk, could you tell how often providers are released, please?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-756672885


   Can you add tests to avoid regression?
   
   >  will this change appear in library: apache-airflow-backport-providers-google ?
   
   Yes, but the release date is not known yet, but we will likely release the first RC version after January 15th. After that, a formal vote will still have to take place to get a stable release. These are not official dates either, but only my predictions. That could all change as the next release will be a bit bigger. See: https://github.com/apache/airflow/issues/13230


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-800233634


   Fix looks good, only tests are missing. 
   @terekete do you plan to add tests? If not, @bitnahian would you like to do it? I can make review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aarshayj commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
aarshayj commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-787398700


   @TobKed 
   After following the code flow, I think I see what the major source of the issue is:
   - `DataflowStartFlexTemplateOperator`'s init has `wait_until_finished: Optional[bool] = None,` as an argument
   ```
   @apply_defaults
       def __init__(
           self,
           body: Dict,
           location: str,
           project_id: Optional[str] = None,
           gcp_conn_id: str = "google_cloud_default",
           delegate_to: Optional[str] = None,
           drain_pipeline: bool = False,
           cancel_timeout: Optional[int] = 10 * 60,
           wait_until_finished: Optional[bool] = None,
           *args,
           **kwargs,
       ) -> None:
           super().__init__(*args, **kwargs)
           self.body = body
           self.location = location
           self.project_id = project_id
           self.gcp_conn_id = gcp_conn_id
           self.delegate_to = delegate_to
           self.drain_pipeline = drain_pipeline
           self.cancel_timeout = cancel_timeout
           self.wait_until_finished = wait_until_finished
           self.job_id = None
           self.hook: Optional[DataflowHook] = None
   ```
   
   - The execute method of `DataflowStartFlexTemplateOperator` initializes `DataflowHook` 
   ```
       def execute(self, context):
           self.hook = DataflowHook(
               gcp_conn_id=self.gcp_conn_id,
               delegate_to=self.delegate_to,
               drain_pipeline=self.drain_pipeline,
               cancel_timeout=self.cancel_timeout,
               wait_until_finished=self.wait_until_finished,
           )
   ```
   - `DataflowHook`'s init sets the same property:
   ```
       def __init__(
           self,
           gcp_conn_id: str = "google_cloud_default",
           delegate_to: Optional[str] = None,
           poll_sleep: int = 10,
           impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
           drain_pipeline: bool = False,
           cancel_timeout: Optional[int] = 5 * 60,
           wait_until_finished: Optional[bool] = None,
       ) -> None:
           self.poll_sleep = poll_sleep
           self.drain_pipeline = drain_pipeline
           self.cancel_timeout = cancel_timeout
           self.wait_until_finished = wait_until_finished
           super().__init__(
               gcp_conn_id=gcp_conn_id,
               delegate_to=delegate_to,
               impersonation_chain=impersonation_chain,
           )
   ```
   - Both `def start_template_dataflow(..)` and `def start_flex_template(..)` initialize `_DataflowJobsController` as
   ```
   jobs_controller = _DataflowJobsController(
               dataflow=self.get_conn(),
               project_number=project_id,
               job_id=job_id,
               location=location,
               poll_sleep=self.poll_sleep,
               num_retries=self.num_retries,
               cancel_timeout=self.cancel_timeout,
           )
   ```
   The issue is that the value of `wait_until_finished` is not being passed to the `_DataflowJobsController`. That's why this if block inside inside the controller code is True as the value is not being passed down.
   ```
   if self._wait_until_finished is None:
         wait_for_running = job.get('type', '') == DataflowJobType.JOB_TYPE_STREAMING
   ```
   
   If we fix just this, I think we should be able to solve the problem by passing the right value for `wait_until_finished`.
   
   Does this make sense?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akshayapte edited a comment on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
akshayapte edited a comment on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-950399576


   Hello @TobKed I have the same issue when I use the following code:
   `
   
   with models.DAG(
       # The id you will see in the DAG airflow page
       "event_consolidator",
       # The interval with which to schedule the DAG
       schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
   ) as dag:
   
       start_template_job = DataflowStartFlexTemplateOperator(
       	wait_until_finished=True,
       	do_xcom_push=True,
   
           # The task id of your job
           task_id="dataflow_operator_event_consolidation",
           location="europe-west4",
         
       
       	body={
               "launchParameter": {
                   "containerSpecGcsPath": "gs://event-consol-templates-edge-default/event_consolidator/python_command_spec.json",
                   "jobName": "discover-event-consolidation-from-airflow",
                   "parameters": {
   			        "matched_event": '*********',
   			        "consolidated_event": 'gs://****/',
   			        "setup_file": '/dataflow/template/setup.py'
   		    	},
                   "environment": {
                       "subnetwork": 'https://www.googleapis.com/compute/v1/projects/***/regions/europe-west4/subnetworks/**********,
                       "machineType": "n1-standard-1",
                       "numWorkers": "1",
                       "maxWorkers": "1",
                       "tempLocation": "*******" + "/tmp/",
                   },
               }
           },
           project_id="trv-hs-src-disco-consol-edge"
       )
   
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321, in _check_dataflow_job_stat
       wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
   KeyError: 'type'
   `
       
       


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-754474512


   CC @TobKed @mik-laj 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-756672885


   Can you add tests to avoid regression?
   
   >  will this change appear in library: apache-airflow-backport-providers-google ?
   
   Yes, but the release date is not known yet, but we will likely release the first RC version after January 15th. After that, a formal vote will still have to take place to get a stable release. These are not official dates either, but only my predictions. That could all change as the next release will be a bit bigger. See: https://github.com/apache/airflow/issues/13230


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-948442617


   Hi @akshayapte , merged in March. Could you provide more info about your issue?
   Which version of airflow and google providers do you use?
   A snippet of usage of the operator/hook will be very helpful as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] terekete commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
terekete commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-755442720


   @TobKed  - once the PR is merged, will this change appear in library: apache-airflow-backport-providers-google ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #13478:
URL: https://github.com/apache/airflow/pull/13478


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] matthieucham commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
matthieucham commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-800040334


   Hi, I see the fix is still not merged, and even if it were, there would be quite some time before it'd be backported to Cloud Composer on GCP. 
   This is why I chose a different approach to launch Dataflow jobs, I wrote a blog post about it: https://dev.to/stack-labs/orchestrate-dataflow-pipelines-easily-with-gcp-workflows-1i8k
   I know, it's not the real fix for Airflow but it fits my need, so maybe it can be of interest for some of you


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akshayapte edited a comment on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
akshayapte edited a comment on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-950399576


   Hello @TobKed I have the same issue when I use the following code:
   `
   
   with models.DAG(
       # The id you will see in the DAG airflow page
       "event_consolidator",
       # The interval with which to schedule the DAG
       schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
   ) as dag:
   
       start_template_job = DataflowStartFlexTemplateOperator(
       	wait_until_finished=True,
       	do_xcom_push=True,
   
           # The task id of your job
           task_id="dataflow_operator_event_consolidation",
           location="europe-west4",
         
       
       	body={
               "launchParameter": {
                   "containerSpecGcsPath": "gs://***/python_command_spec.json",
                   "jobName": "discover-event-consolidation-from-airflow",
                   "parameters": {
   			        "matched_event": '*********',
   			        "consolidated_event": 'gs://****/',
   			        "setup_file": '/dataflow/template/setup.py'
   		    	},
                   "environment": {
                       "subnetwork": 'https://www.googleapis.com/compute/v1/projects/***/regions/europe-west4/subnetworks/**********,
                       "machineType": "n1-standard-1",
                       "numWorkers": "1",
                       "maxWorkers": "1",
                       "tempLocation": "*******" + "/tmp/",
                   },
               }
           },
           project_id="trv-hs-src-disco-consol-edge"
       )
   
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321, in _check_dataflow_job_stat
       wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
   KeyError: 'type'
   `
       
       


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akshayapte edited a comment on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

Posted by GitBox <gi...@apache.org>.
akshayapte edited a comment on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-950399576


   Hello @TobKed I have the same issue when I use the following code:
   `
   
   with models.DAG(
       # The id you will see in the DAG airflow page
       "event_consolidator",
       # The interval with which to schedule the DAG
       schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
   ) as dag:
   
       start_template_job = DataflowStartFlexTemplateOperator(
       	wait_until_finished=True,
       	do_xcom_push=True,
   
           # The task id of your job
           task_id="dataflow_operator_event_consolidation",
           location="europe-west4",
         
       
       	body={
               "launchParameter": {
                   "containerSpecGcsPath": "gs://event-consol-templates-edge-default/event_consolidator/python_command_spec.json",
                   "jobName": "discover-event-consolidation-from-airflow",
                   "parameters": {
   			        "matched_event": '*********',
   			        "consolidated_event": 'gs://****/',
   			        "setup_file": '/dataflow/template/setup.py'
   		    	},
                   "environment": {
                       "subnetwork": 'https://www.googleapis.com/compute/v1/projects/***/regions/europe-west4/subnetworks/**********,
                       "machineType": "n1-standard-1",
                       "numWorkers": "1",
                       "maxWorkers": "1",
                       "tempLocation": "*******" + "/tmp/",
                   },
               }
           },
           project_id="trv-hs-src-disco-consol-edge"
       )
   
   
         File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 348, in wait_for_don
       while self._jobs and not all(self._check_dataflow_job_state(job) for job in self._jobs)
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 348, in <genexpr
       while self._jobs and not all(self._check_dataflow_job_state(job) for job in self._jobs)
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321, in _check_dataflow_job_stat
       wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
   KeyError: 'type'
   `
       
       


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org