You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (Jira)" <ji...@apache.org> on 2019/12/05 12:37:01 UTC
[jira] [Commented] (AIRFLOW-6179) sqlalchemy.exc.IntegrityError:
(psycopg2.IntegrityError) null value in column "pool" violates not-null
constraint
[ https://issues.apache.org/jira/browse/AIRFLOW-6179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988751#comment-16988751 ]
Ash Berlin-Taylor commented on AIRFLOW-6179:
--------------------------------------------
This was one of the changes in 1.10.6 -- Pool is now required and there is an explicit "default_pool" form 1.10.6 onwards. It sounds like your DB version is out of sync with the code you are running.
> sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) null value in column "pool" violates not-null constraint
> -----------------------------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-6179
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6179
> Project: Apache Airflow
> Issue Type: Bug
> Components: operators, scheduler
> Affects Versions: 1.10.5
> Reporter: Ayush Chauhan
> Priority: Major
> Attachments: task instance detail.png
>
>
> I am trying to run apache airflow in ECS using the {{v1-10-stable}} branch of [apache/airflow|https://github.com/apache/airflow/tree/v1-10-stable] using my fork [airflow|https://github.com/ayush-san/airflow/tree/zmaster]. I am using *CeleryExecutor*. I have updated the python version to 3.7.5. When I trying to run my sample sqoop import dag, I am getting this error on the scheduler container
>
> {code:java}
> [SQL: UPDATE task_instance SET state=%(state)s, pool=%(pool)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.execution_date = %(task_instance_execution_date)s]
> [parameters: {'state': 'scheduled', 'pool': None, 'task_instance_task_id': 'zomatogolddb.red_subscriptions_log.table_cleanup', 'task_instance_dag_id': 'mysql_import_adhoc', 'task_instance_execution_date': datetime.datetime(2019, 12, 5, 11, 34, 33, 758857, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>)}]
> (Background on this error at: http://sqlalche.me/e/gkpj)
> Process DagFileProcessor18571-Process:
> Traceback (most recent call last):
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
> cursor, statement, parameters, context
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
> cursor.execute(statement, parameters)
> psycopg2.IntegrityError: null value in column "pool" violates not-null constraint
> DETAIL: Failing row contains (zomatogolddb.red_subscriptions_log.table_cleanup, mysql_import_adhoc, 2019-12-05 11:34:33.758857+00, null, null, null, scheduled, 0, , airflow, null, null, default, 9, null, null, null, 1, \x80047d942e).
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
> File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
> self.run()
> File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
> self._target(*self._args, **self._kwargs)
> File "/opt/airflow/airflow/jobs/scheduler_job.py", line 148, in _run_file_processor
> result = scheduler_job.process_file(file_path, pickle_dags)
> File "/opt/airflow/airflow/utils/db.py", line 74, in wrapper
> return func(*args, **kwargs)
> File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1599, in process_file
> session.commit()
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
> self.transaction.commit()
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 494, in commit
> self._prepare_impl()
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
> self.session.flush()
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2470, in flush
> self._flush(objects)
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2608, in _flush
> transaction.rollback(_capture_exception=True)
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
> raise value
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2568, in _flush
> flush_context.execute()
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
> rec.execute(self)
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
> uow,
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
> update,
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 996, in _emit_update_statements
> statement, multiparams
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute
> return meth(self, multiparams, params)
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
> return connection._execute_clauseelement(self, multiparams, params)
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
> distilled_params,
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
> e, statement, parameters, cursor, context
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
> util.raise_from_cause(sqlalchemy_exception, exc_info)
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
> reraise(type(exception), exception, tb=exc_tb, cause=cause)
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
> raise value.with_traceback(tb)
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
> cursor, statement, parameters, context
> File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
> cursor.execute(statement, parameters)
> sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) null value in column "pool" violates not-null constraint
> DETAIL: Failing row contains (zomatogolddb.red_subscriptions_log.table_cleanup, mysql_import_adhoc, 2019-12-05 11:34:33.758857+00, null, null, null, scheduled, 0, , airflow, null, null, default, 9, null, null, null, 1, \x80047d942e).
> {code}
>
> Even though I have passed the pool name for this task and it can be seen in the webserver UI as well. I have attached the webserver UI screenshot for reference.
>
> Previously we were using airflow in *_LocalExecutor_* mode in v1.10.3 without explicitly passing the pool in our operator and we were using MySQL as metastore.
> {code:python}
> table_cleanup_task: TableCleanupOperator = TableCleanupOperator(
> dag=self.dag,
> pool=None,
> task_id=table_cleanup_task_id,
> trigger_rule=TriggerRule.ALL_DONE,
> task_config=task_config,
> dag_config=self.dag_config)
> {code}
>
> But it was giving me error in v1.10.5 that pool name can't be None, so I have changed it to the following code. Also, I am using Postgres for metastore in this new setup.
> {code:python}
> table_cleanup_task: TableCleanupOperator = TableCleanupOperator(
> dag=self.dag,
> pool='default_pool',
> task_id=table_cleanup_task_id,
> trigger_rule=TriggerRule.ALL_DONE,
> task_config=task_config,
> dag_config=self.dag_config)
> {code}
>
> But still, I am getting the above-mentioned error in the scheduler container
--
This message was sent by Atlassian Jira
(v8.3.4#803005)