You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Paymahn Moghadasian (JIRA)" <ji...@apache.org> on 2018/03/13 20:37:00 UTC

[jira] [Commented] (AIRFLOW-2214) Import error when interpreting DAG with customer operator

    [ https://issues.apache.org/jira/browse/AIRFLOW-2214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397612#comment-16397612 ] 

Paymahn Moghadasian commented on AIRFLOW-2214:
----------------------------------------------

I've also tried naming my plugin "test" ie:

{code:python}
class MordorPlugin(AirflowPlugin):
    name = "test"
    operators = [MordorOperator]
{code}

and using new Airflow 2.0 style imports:

{code:python}
from airflow import DAG
from airflow.operators.test import MordorOperator
from datetime import datetime


dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False)

mordor = MordorOperator(job="testing", task_id='run_single_task', dag=dag)
{code}

But that still fails:

{code:bash}
(venv)  2:35PM /Users/paymahn/solvvy/scheduler  ✘ 1 mordor.operator ✱
 ❯❯❯ AIRFLOW_USE_NEW_IMPORTS=1 python3 -m unittest
section/key [core/airflow-home] not found in config
section/key [core/airflow-home] not found in config
section/key [core/airflow-home] not found in config
section/key [core/airflow-home] not found in config
Traceback (most recent call last):
  File "/Users/paymahn/solvvy/scheduler/dags/mordor_test.py", line 2, in <module>
    from airflow.operators.test import MordorOperator
ModuleNotFoundError: No module named 'airflow.operators.test'
........
======================================================================
FAIL: test_dags_parse (tests.test_dags.TestDagsAreSane) (dag='mordor_test.py')
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/paymahn/solvvy/scheduler/tests/test_dags.py", line 18, in test_dags_parse
    self.assertEqual(0, subprocess.run(['python3', os.path.join(os.getcwd(), 'dags', dag)]).returncode)
AssertionError: 0 != 1

----------------------------------------------------------------------
Ran 9 tests in 3.551s

FAILED (failures=1)
{code}

However, these problems don't persist when running the webserver, scheduler or worker.


> Import error when interpreting DAG with customer operator
> ---------------------------------------------------------
>
>                 Key: AIRFLOW-2214
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2214
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, operators, plugins
>    Affects Versions: 1.9.0
>            Reporter: Paymahn Moghadasian
>            Priority: Minor
>
> The [airflow docs|(https://airflow.apache.org/tutorial.html#testing] suggest that a basic sanity check for a DAG file is to interpret it. ie:
> {code:bash}
> $ python ~/path/to/my/dag.py
> {code}
> I've found this to be useful. However, now I've created a plugin, `MordorOperator` under `$AIRFLOW_HOME/plugins`:
> {code:python}
> from airflow.plugins_manager import AirflowPlugin
> from airflow.utils.decorators import apply_defaults
> from airflow.operators import BaseOperator
> from airflow.exceptions import AirflowException
> class MordorOperator(BaseOperator):
>     JOB_QUEUE_MAPPING = {"testing": "testing"}
>     @apply_defaults
>     def __init__(self, job, *args, **kwargs):
>         super().__init__(*args, **kwargs)
>     def execute(self, context):
>         print("Executing mordoroperator")
> class MordorPlugin(AirflowPlugin):
>     name = "MordorPlugin"
>     operators = [MordorOperator]
> {code}
> I can import the plugin and see it work in a sample DAG:
> {code:python}
> from airflow import DAG
> from airflow.operators import MordorOperator
> from datetime import datetime
> dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False)
> mordor = MordorOperator(job="testing", task_id='run_single_task', dag=dag)
> {code}
> However, when I try to interpret this file I get failures which I suspect I shouldn't get since the plugin successfully runs. My suspicion is that this is because there's some dynamic code gen happening at runtime which isn't available when a DAG is interpreted by itself. I also find that PyCharm can't perform any autocompletion when importing the plugin.
> {code:bash}
> (venv)  3:54PM /Users/paymahn/solvvy/scheduler mordor.operator ✱
>  ❮❮❮ python dags/mordor_test.py
> section/key [core/airflow-home] not found in config
> Traceback (most recent call last):
>   File "dags/mordor_test.py", line 2, in <module>
>     from airflow.operators import MordorOperator
> ImportError: cannot import name 'MordorOperator'
> {code}
> How can a DAG using a plugin be sanity tested? Is there another mechanism to import the custom operator which will allow interpreting the DAG from the commandline?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)