You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Alex Chu (JIRA)" <ji...@apache.org> on 2017/09/27 12:22:00 UTC

[jira] [Created] (AIRFLOW-1648) Subtask failed to import a DAG - Process timed out

Alex Chu created AIRFLOW-1648:
---------------------------------

             Summary: Subtask failed to import a DAG - Process timed out
                 Key: AIRFLOW-1648
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1648
             Project: Apache Airflow
          Issue Type: Bug
          Components: DAG, DagRun
    Affects Versions: Airflow 1.8
         Environment: OS: Ubuntu 16.04
Airflow 1.8.1

            Reporter: Alex Chu
            Priority: Minor


It has been a few days, our DAG is failed. It comes with the same error "Process timed out". The subtask could even not import the DAG file code in /opt/airflow/dags/ and the timeout *dagbag_import_timeout* reached (default 30 seconds defined in airflow.cfg).

We have a DAG with "a waiter" (SensorOperator) and a subdag contains the tasks (PythonOperator) which never fails
The other DAG with "a waiter" (not a SensorOperator, just a PythonOperator task with the setting of many retries) and a subdag contains the tasks (PythonOperator) which fails all the time.
Workaround: Just force re-run the tasks, it will work.
The problem that i could not reproduce in integration environment, only on prod environment. It's probably because of an important volume of data on production env

Here is the backtrace:

{quote}
[2017-09-21 06:32:58,636] {models.py:168} INFO - Filling up the DagBag from /opt/airflow/dags/dmp-das-bq-shore/dmp_das_bq_shore.py
[2017-09-21 06:33:23,530] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run dmp_das_bq_shore.account_000036 transfer_profilematch_000036 2017-09-20T06:30:00 --job_id 308549 --raw -sd DAGS_FOLDER/dmp-das-bq-shore/dmp_das_bq_shore.py']
[2017-09-21 06:33:40,836] {base_task_runner.py:95} INFO - Subtask: [2017-09-21 06:33:40,836] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-09-21 06:33:42,263] {base_task_runner.py:95} INFO - Subtask: [2017-09-21 06:33:42,262] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-09-21 06:33:42,757] {base_task_runner.py:95} INFO - Subtask: [2017-09-21 06:33:42,757] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-09-21 06:33:47,356] {base_task_runner.py:95} INFO - Subtask: [2017-09-21 06:33:47,356] {models.py:168} INFO - Filling up the DagBag from /opt/airflow/dags/dmp-das-bq-shore/dmp_das_bq_shore.py
[2017-09-21 06:33:53,427] {base_task_runner.py:95} INFO - Subtask: /opt/airflow/src/airflow/airflow/utils/helpers.py:406: DeprecationWarning: Importing PythonOperator directly from <module 'airflow.operators' from '/opt/airflow/src/airflow/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/opt/airflow/src/airflow/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
[2017-09-21 06:33:53,427] {base_task_runner.py:95} INFO - Subtask:   DeprecationWarning)
[2017-09-21 06:33:53,428] {base_task_runner.py:95} INFO - Subtask: /opt/airflow/src/airflow/airflow/utils/helpers.py:406: DeprecationWarning: Importing SubDagOperator directly from <module 'airflow.operators' from '/opt/airflow/src/airflow/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/opt/airflow/src/airflow/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
[2017-09-21 06:33:53,428] {base_task_runner.py:95} INFO - Subtask:   DeprecationWarning)
[2017-09-21 06:34:17,408] {base_task_runner.py:95} INFO - Subtask: [2017-09-21 06:34:17,408] {timeout.py:37} ERROR - Process timed out
[2017-09-21 06:34:17,410] {base_task_runner.py:95} INFO - Subtask: [2017-09-21 06:34:17,409] {models.py:267} ERROR - Failed to import: /opt/airflow/dags/dmp-das-bq-shore/dmp_das_bq_shore.py
[2017-09-21 06:34:17,410] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2017-09-21 06:34:17,411] {base_task_runner.py:95} INFO - Subtask:   File "/opt/airflow/src/airflow/airflow/models.py", line 264, in process_file
[2017-09-21 06:34:17,411] {base_task_runner.py:95} INFO - Subtask:     m = imp.load_source(mod_name, filepath)
...
[2017-09-21 06:34:17,412] {base_task_runner.py:95} INFO - Subtask:   File "/usr/lib/python2.7/inspect.py", line 1060, in stack
[2017-09-21 06:34:17,413] {base_task_runner.py:95} INFO - Subtask:     return getouterframes(sys._getframe(1), context)
[2017-09-21 06:34:17,413] {base_task_runner.py:95} INFO - Subtask:   File "/usr/lib/python2.7/inspect.py", line 1038, in getouterframes
[2017-09-21 06:34:17,413] {base_task_runner.py:95} INFO - Subtask:     framelist.append((frame,) + getframeinfo(frame, context))
[2017-09-21 06:34:17,413] {base_task_runner.py:95} INFO - Subtask:   File "/usr/lib/python2.7/inspect.py", line 1013, in getframeinfo
[2017-09-21 06:34:17,413] {base_task_runner.py:95} INFO - Subtask:     lines, lnum = findsource(frame)
[2017-09-21 06:34:17,413] {base_task_runner.py:95} INFO - Subtask:   File "/usr/lib/python2.7/inspect.py", line 532, in findsource
[2017-09-21 06:34:17,414] {base_task_runner.py:95} INFO - Subtask:     module = getmodule(object, file)
[2017-09-21 06:34:17,414] {base_task_runner.py:95} INFO - Subtask:   File "/usr/lib/python2.7/inspect.py", line 493, in getmodule
[2017-09-21 06:34:17,414] {base_task_runner.py:95} INFO - Subtask:     if f == _filesbymodname.get(modname, None):
[2017-09-21 06:34:17,414] {base_task_runner.py:95} INFO - Subtask:   File "/opt/airflow/src/airflow/airflow/utils/timeout.py", line 38, in handle_timeout
[2017-09-21 06:34:17,414] {base_task_runner.py:95} INFO - Subtask:     raise AirflowTaskTimeout(self.error_message)
[2017-09-21 06:34:17,414] {base_task_runner.py:95} INFO - Subtask: AirflowTaskTimeout: Timeout
[2017-09-21 06:34:17,416] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2017-09-21 06:34:17,439] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/bin/airflow", line 6, in <module>
[2017-09-21 06:34:17,439] {base_task_runner.py:95} INFO - Subtask:     exec(compile(open(__file__).read(), __file__, 'exec'))
[2017-09-21 06:34:17,439] {base_task_runner.py:95} INFO - Subtask:   File "/opt/airflow/src/airflow/airflow/bin/airflow", line 28, in <module>
[2017-09-21 06:34:17,440] {base_task_runner.py:95} INFO - Subtask:     args.func(args)
[2017-09-21 06:34:17,440] {base_task_runner.py:95} INFO - Subtask:   File "/opt/airflow/src/airflow/airflow/bin/cli.py", line 388, in run
[2017-09-21 06:34:17,440] {base_task_runner.py:95} INFO - Subtask:     dag = get_dag(args)
[2017-09-21 06:34:17,441] {base_task_runner.py:95} INFO - Subtask:   File "/opt/airflow/src/airflow/airflow/bin/cli.py", line 126, in get_dag
[2017-09-21 06:34:17,441] {base_task_runner.py:95} INFO - Subtask:     'parse.'.format(args.dag_id))
[2017-09-21 06:34:17,444] {base_task_runner.py:95} INFO - Subtask: airflow.exceptions.AirflowException: dag_id could not be found: dmp_das_bq_shore.account_000036. Either the dag did not exist or it failed to parse.
[2017-09-21 06:34:23,649] {jobs.py:2107} INFO - Task exited with return code 1
[2017-09-21 08:16:17,825] {models.py:168} INFO - Filling up the DagBag from /opt/airflow/dags/dmp-das-bq-shore/dmp_das_bq_shore.py
{quote}






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)