You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Paymahn Moghadasian (JIRA)" <ji...@apache.org> on 2018/03/13 20:37:00 UTC
[jira] [Commented] (AIRFLOW-2214) Import error when interpreting
DAG with customer operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397612#comment-16397612 ]
Paymahn Moghadasian commented on AIRFLOW-2214:
----------------------------------------------
I've also tried naming my plugin "test" ie:
{code:python}
class MordorPlugin(AirflowPlugin):
name = "test"
operators = [MordorOperator]
{code}
and using new Airflow 2.0 style imports:
{code:python}
from airflow import DAG
from airflow.operators.test import MordorOperator
from datetime import datetime
dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False)
mordor = MordorOperator(job="testing", task_id='run_single_task', dag=dag)
{code}
But that still fails:
{code:bash}
(venv) 2:35PM /Users/paymahn/solvvy/scheduler ✘ 1 mordor.operator ✱
❯❯❯ AIRFLOW_USE_NEW_IMPORTS=1 python3 -m unittest
section/key [core/airflow-home] not found in config
section/key [core/airflow-home] not found in config
section/key [core/airflow-home] not found in config
section/key [core/airflow-home] not found in config
Traceback (most recent call last):
File "/Users/paymahn/solvvy/scheduler/dags/mordor_test.py", line 2, in <module>
from airflow.operators.test import MordorOperator
ModuleNotFoundError: No module named 'airflow.operators.test'
........
======================================================================
FAIL: test_dags_parse (tests.test_dags.TestDagsAreSane) (dag='mordor_test.py')
----------------------------------------------------------------------
Traceback (most recent call last):
File "/Users/paymahn/solvvy/scheduler/tests/test_dags.py", line 18, in test_dags_parse
self.assertEqual(0, subprocess.run(['python3', os.path.join(os.getcwd(), 'dags', dag)]).returncode)
AssertionError: 0 != 1
----------------------------------------------------------------------
Ran 9 tests in 3.551s
FAILED (failures=1)
{code}
However, these problems don't persist when running the webserver, scheduler or worker.
> Import error when interpreting DAG with customer operator
> ---------------------------------------------------------
>
> Key: AIRFLOW-2214
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2214
> Project: Apache Airflow
> Issue Type: Bug
> Components: DAG, operators, plugins
> Affects Versions: 1.9.0
> Reporter: Paymahn Moghadasian
> Priority: Minor
>
> The [airflow docs|(https://airflow.apache.org/tutorial.html#testing] suggest that a basic sanity check for a DAG file is to interpret it. ie:
> {code:bash}
> $ python ~/path/to/my/dag.py
> {code}
> I've found this to be useful. However, now I've created a plugin, `MordorOperator` under `$AIRFLOW_HOME/plugins`:
> {code:python}
> from airflow.plugins_manager import AirflowPlugin
> from airflow.utils.decorators import apply_defaults
> from airflow.operators import BaseOperator
> from airflow.exceptions import AirflowException
> class MordorOperator(BaseOperator):
> JOB_QUEUE_MAPPING = {"testing": "testing"}
> @apply_defaults
> def __init__(self, job, *args, **kwargs):
> super().__init__(*args, **kwargs)
> def execute(self, context):
> print("Executing mordoroperator")
> class MordorPlugin(AirflowPlugin):
> name = "MordorPlugin"
> operators = [MordorOperator]
> {code}
> I can import the plugin and see it work in a sample DAG:
> {code:python}
> from airflow import DAG
> from airflow.operators import MordorOperator
> from datetime import datetime
> dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False)
> mordor = MordorOperator(job="testing", task_id='run_single_task', dag=dag)
> {code}
> However, when I try to interpret this file I get failures which I suspect I shouldn't get since the plugin successfully runs. My suspicion is that this is because there's some dynamic code gen happening at runtime which isn't available when a DAG is interpreted by itself. I also find that PyCharm can't perform any autocompletion when importing the plugin.
> {code:bash}
> (venv) 3:54PM /Users/paymahn/solvvy/scheduler mordor.operator ✱
> ❮❮❮ python dags/mordor_test.py
> section/key [core/airflow-home] not found in config
> Traceback (most recent call last):
> File "dags/mordor_test.py", line 2, in <module>
> from airflow.operators import MordorOperator
> ImportError: cannot import name 'MordorOperator'
> {code}
> How can a DAG using a plugin be sanity tested? Is there another mechanism to import the custom operator which will allow interpreting the DAG from the commandline?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)