You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (JIRA)" <ji...@apache.org> on 2019/06/21 09:29:00 UTC

[jira] [Closed] (AIRFLOW-1284) DetachedInstanceError while using dependencies

     [ https://issues.apache.org/jira/browse/AIRFLOW-1284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ash Berlin-Taylor closed AIRFLOW-1284.
--------------------------------------
    Resolution: Abandoned

Probably not an issue any more either

> DetachedInstanceError while using dependencies
> ----------------------------------------------
>
>                 Key: AIRFLOW-1284
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1284
>             Project: Apache Airflow
>          Issue Type: Bug
>         Environment: Airflow 1.8.1 on Ubuntu 14.04 and PostgreSQL database.
>            Reporter: Jakub Powierza
>            Priority: Major
>
> *What happened?*
> I've encountered an error that seems to be related with Airflow's dependencies. I've enabled them recently and they failed on my environment.
> *What I'm using?*
> I'm using Airflow 1.8.1 with Ubuntu 14.04 and PostgreSQL database.
> *My usage:*
> {noformat}
> class CustomDependency(BaseTIDep):
>     ...
>     def _get_dep_statuses(self, ti: TaskInstance, session: scoped_session, dep_context: Mapping = None) -> Iterable[TIDepStatus]:
>         my_remote_system = get_remote_system_by_name(remote_system_name)
>         if my_remote_system.is_up:
>             yield self._passing_status(reason='Remote system {} is up.'.format(my_remote_system))
>         else:
>             yield self._failing_status(reason='Remote system {} appears to be down.'.format(my_remote_system))
> class MyOperator(BaseOperator):
>     ...
>     def deps(self) -> MutableSet[BaseTIDep]:
>         deps = super().deps
>         deps.add(CustomDependency(self.remote_system_name))
>         return deps
> {noformat}
> *Exception:*
> {noformat}
> Traceback (most recent call last):
>   File "/opt/GTA/workflow/venv/src/airflow/airflow/jobs.py", line 324, in helper
>     pickle_dags)
>   File "/opt/GTA/workflow/venv/src/airflow/airflow/utils/db.py", line 54, in wrapper
>     result = func(*args, **kwargs)
>   File "/opt/GTA/workflow/venv/src/airflow/airflow/jobs.py", line 1581, in process_file
>     self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/opt/GTA/workflow/venv/src/airflow/airflow/jobs.py", line 1152, in _process_dags
>     self._process_task_instances(dag, tis_out)
>   File "/opt/GTA/workflow/venv/src/airflow/airflow/jobs.py", line 838, in _process_task_instances
>     self.logger.info("Examining DAG run {}".format(run))
>   File "/opt/GTA/workflow/venv/src/airflow/airflow/models.py", line 3803, in __repr__
>     dag_id=self.dag_id,
>   File "/opt/GTA/workflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/attributes.py", line 237, in __get__
>     return self.impl.get(instance_state(instance), dict_)
>   File "/opt/GTA/workflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/attributes.py", line 579, in get
>     value = state._load_expired(state, passive)
>   File "/opt/GTA/workflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/state.py", line 592, in _load_expired
>     self.manager.deferred_scalar_loader(self, toload)
>   File "/opt/GTA/workflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", line 644, in load_scalar_attributes
>     (state_str(state)))
> sqlalchemy.orm.exc.DetachedInstanceError: Instance <DagRun at 0x7fa12bcb9f98> is not bound to a Session; attribute refresh operation cannot proceed
> {noformat}
> PS. This feature sounds great but it seems to cause too much problems right now. Maybe I'm using it wrong? It would be nice to document it better :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)