You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/22 12:31:49 UTC

[GitHub] [airflow] FabioCoder opened a new issue, #28535: Dynamic Task Mapping: No tasks are mapped, although the list to be mapped is not empty

FabioCoder opened a new issue, #28535:
URL: https://github.com/apache/airflow/issues/28535

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Airflow 2.4.2
   
   No tasks are mapped and no downstream tasks are executed, although the list to be mapped is not empty. This only happens sometimes and not regularly. 
   
   Extract from the log of the task that contains the list to be mapped:
   
   ```
   AIRFLOW_CTX_DAG_RUN_ID=dataset_triggered__2022-12-21T02:56:51.044765+00:00
   [2022-12-21, 05:31:11 CET] {python.py:177} INFO - Done. Returned value was: ['20e565f2-6492-4fe3-b4d7-f79f1646eaec',  ... '2d9dbd70-7ad7-4dab-a635-1248082b9a46']
   [2022-12-21, 05:31:11 CET] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=qlik_reload_dag, task_id=qlik_app_tasks.get_tasks_by_tag, execution_date=20221221T025651, start_date=20221221T043110, end_date=20221221T043111
   [2022-12-21, 05:31:11 CET] {local_task_job.py:164} INFO - Task exited with return code 0
   [2022-12-21, 05:31:11 CET] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```
   
   For this task, there is no XCom message containing the returned list, although the function has returned an list.
   
   ### What you think should happen instead
   
   The array has a length of 85, so 85 tasks should have been mapped. 
   
   ### How to reproduce
   
   **custom/operators.py:**
   
   ```
   from airflow.models import BaseOperator
   from airflow.utils.decorators import apply_defaults
   from custom.hooks import CustomHook
   
   class CustomOperator(BaseOperator):
   
       @apply_defaults
       def __init__(self, conn_id, app_task_id, **kwargs):
           super(QlikSenseStartTaskOperator, self).__init__(**kwargs)
           self._conn_id = conn_id
           self._app_task_id= app_task_id
   
       def execute(self, context):
           hook = CustomHook(conn_id=self._conn_id)
   
           try:
               ...
           finally:
               hook.close()
   
   ```
   
   
   **custom/utils.py :**
   
   ```
   from airflow.decorators import task
   from custom.hooks import CustomHook
   import json
   
   @task
   def get_tasks_by_tag(conn_id, tag_id):
       hook = CustomHook(conn_id=conn_id)
   
       try:
           tasks = json.loads(hook.get_tasks(tag_id))
       finally:
           hook.close()
   
       task_ids = []
       for task in tasks:
           task_ids.append(task.get('id'))
   
       return task_ids
   
   ```
   **dag.py:**
   
   ```
   from airflow import DAG
   from custom.operators import CustomOperator
   from custom.utils import get_tasks_by_tag
   
   with DAG(
    ...
   ) as dag:
   
         t1 = CustomOperator.\
             partial(task_id='start_task', conn_id='xxxx').\
             expand(app_task_id=get_tasks_by_tag(
                 'xxxx', tag_id))
   
           t1
   
   ```
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #28535: Dynamic Task Mapping: No tasks are mapped, although the list to be mapped is not empty

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #28535:
URL: https://github.com/apache/airflow/issues/28535#issuecomment-1362784628

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #28535: Dynamic Task Mapping: No tasks are mapped, although the list to be mapped is not empty

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #28535:
URL: https://github.com/apache/airflow/issues/28535#issuecomment-1362791947

   Code for CustomOperator, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #28535: Dynamic Task Mapping: No tasks are mapped, although the list to be mapped is not empty

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #28535:
URL: https://github.com/apache/airflow/issues/28535#issuecomment-1369456393

   I simplified the DAG to this bare minimum and it ran fine for me:
   
   ```python
   from datetime import datetime
   from airflow.decorators import task
   from airflow.models import DAG, BaseOperator
   
   class CustomOperator(BaseOperator):
       def __init__(self, task_id, app_task_id, **kwargs):
           super(CustomOperator, self).__init__(task_id=task_id, **kwargs)
           self._app_task_id = app_task_id
   
       def execute(self, context):
           pass
   
   @task
   def get_tasks_by_tag():
       return [f"task_{i}" for i in range(3)]
   
   with DAG(dag_id="custom_expand", schedule=None, start_date=datetime(2023, 1, 1)) as dag:
       CustomOperator.partial(task_id='start_task').expand(app_task_id=get_tasks_by_tag())
   ```
   
   It would be awesome if you could simplify the code a bit to help identify what part exactly is causing your DAG to malfunction. As far as I can tell, the expansion logic does not seem to be the issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed issue #28535: Dynamic Task Mapping: No tasks are mapped, although the list to be mapped is not empty

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #28535: Dynamic Task Mapping: No tasks are mapped, although the list to be mapped is not empty
URL: https://github.com/apache/airflow/issues/28535


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org