You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/17 02:33:19 UTC

[GitHub] [airflow] bitnahian commented on pull request #13478: Google Dataflow Flex Operator to handle no Job Type

bitnahian commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-780254830


   Hi, is there a temporary fix for batch jobs to not fail until this PR gets merged? I have the following DataflowFlexTemplate operator:
   
   ```python
   gcs_to_solr_transfer_body = {
       "launchParameter": {
           "containerSpecGcsPath": GCS_FLEX_TEMPLATE_TEMPLATE_PATH,
           "jobName": DATAFLOW_FLEX_TEMPLATE_JOB_NAME,
           "parameters": {
               "inputFile": FLEX_INPUT_FILE,
               "solrURL": FLEX_SOLR_URL
           },
           "environment": {
               "workerRegion": FLEX_TEMPLATE_LOCATION,
               "serviceAccountEmail": COMMON_SA,
               "maxWorkers": FLEX_MAX_NUM_WORKERS
           }
       }
   }
   
   start_flex_template = DataflowStartFlexTemplateOperator(
       task_id="start_gcs_to_solr",
       body=gcs_to_solr_transfer_body,
       do_xcom_push=True,
       location=FLEX_TEMPLATE_LOCATION,
       wait_until_finished=True
   )
   
   ```
   
   I'm receiving the following logs from my cloud composer dag:
   
   ```
   [2021-02-17 02:14:19,736] {dataflow.py:306} INFO - Google Cloud DataFlow job gcs-to-solr-20210217-021335 is state: JOB_STATE_QUEUED
   [2021-02-17 02:14:19,736] {taskinstance.py:1152} ERROR - 'type'
   Traceback (most recent call last)
     File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
       result = task_copy.execute(context=context
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/operators/dataflow.py", line 647, in execut
       on_new_job_id_callback=set_current_job_id
     File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 383, in inner_wrappe
       return func(self, *args, **kwargs
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 804, in start_flex_templat
       jobs_controller.wait_for_done(
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 348, in wait_for_don
       while self._jobs and not all(self._check_dataflow_job_state(job) for job in self._jobs)
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 348, in <genexpr
       while self._jobs and not all(self._check_dataflow_job_state(job) for job in self._jobs)
     File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321, in _check_dataflow_job_stat
       wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMIN
   KeyError: 'type
   ```
   
   I also notice a mismatch between File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataflow.py", line 321 and the actual code base where it's in line 381. Is this something to be worried about? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org