You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Tomas G. (Jira)" <ji...@apache.org> on 2019/10/25 15:59:00 UTC
[jira] [Comment Edited] (AIRFLOW-2327) Cannot pickle PythonOperator
dags using Mesos Executor
[ https://issues.apache.org/jira/browse/AIRFLOW-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16959880#comment-16959880 ]
Tomas G. edited comment on AIRFLOW-2327 at 10/25/19 3:58 PM:
-------------------------------------------------------------
I am getting the same error with CeleryExecutor + Airflow 1.10.5 when using PythonOperator in a dag and pickling option on airflow scheduler.
Same error even with example dag with PythonOperator provided here [https://github.com/apache/airflow/blob/master/airflow/example_dags/example_python_operator.py]
was (Author: tomggg):
I am getting the same error with CeleryExecutor + Airflow 1.10.5 when using PythonOperator in a dag and pickling option on airflow scheduler.
> Cannot pickle PythonOperator dags using Mesos Executor
> ------------------------------------------------------
>
> Key: AIRFLOW-2327
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2327
> Project: Apache Airflow
> Issue Type: Bug
> Components: contrib
> Affects Versions: 1.9.0
> Environment: prod
> Reporter: niraja b
> Priority: Major
>
> We are using the MesosExecutor of Airflow
>
> BashOperator and SimpleHTTPOperator works for us
> The Scheduler is started using -p to pickle the DAGS.
>
>
> The issue we have is with the following sample Code , we tried adding use_dill without use_dill with PythonOperator and with PythonVirtualenvOperator.. we couldnt get it sucessfully working on the agent
>
> from __future__ import print_function
> from airflow.models import DAG
> from datetime import timedelta, datetime
> from airflow.operators.python_operator import PythonOperator,PythonVirtualenvOperator
> DAG_ID = "testdag"
> DEFAULT_ARGS = {
> "start_date": datetime(2018, 4, 16, 1, 50, 16),
> "schedule_interval": None,
> "dagrun_timeout": timedelta(minutes=60),
> "email": ['test@test.com'],
> "email_on_failure": True,
> "email_on_retry": False,
> "retries": 3,
> "retry_delay": timedelta(seconds=5),
> }
> def _testlambda(**kwargs):
> print("hello world")
> with DAG(dag_id=DAG_ID, default_args=DEFAULT_ARGS) as dag:
> (
> PythonVirtualenvOperator(
> task_id='python_1',
> python_callable=_testlambda,
> use_dill=True,
> requirements=['dill']
> )
> )
>
> Error
>
> Traceback (most recent call last):
> File "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 358, in run
> DagPickle).filter(DagPickle.id == args.pickle).first()
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2789, in first
> ret = list(self[0:1])
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2581, in __getitem__
> return list(res)
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line 137, in instances
> util.raise_from_cause(err)
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
> reraise(type(exception), exception, tb=exc_tb, cause=cause)
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line 102, in instances
> logging.debug(str(fetch[0]))
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/result.py", line 156, in __repr__
> return repr(sql_util._repr_row(self))
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/util.py", line 329, in __repr__
> ", ".join(trunc(value) for value in self.row),
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1588, in process
> return loads(value)
> File "/usr/lib/python2.7/site-packages/dill/dill.py", line 299, in loads
> return load(file)
> File "/usr/lib/python2.7/site-packages/dill/dill.py", line 288, in load
> obj = pik.load()
> File "/usr/lib64/python2.7/pickle.py", line 858, in load
> dispatch[key](self)
> File "/usr/lib64/python2.7/pickle.py", line 1090, in load_global
> klass = self.find_class(module, name)
> File "/usr/lib/python2.7/site-packages/dill/dill.py", line 445, in find_class
> return StockUnpickler.find_class(self, module, name)
> File "/usr/lib64/python2.7/pickle.py", line 1124, in find_class
> __import__(module)
> ImportError: No module named unusual_prefix_ac646764c974ff68b827793414d8eabcdca720cf_dmitrydag
> I0416 11:22:34.367975 47476 executor.cpp:938] Command exited with status 1 (pid: 47482)
> I0416 11:22:35.371712 47481 process.cpp:887] Failed to accept socket: future discarded
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)