You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ayush Chauhan (Jira)" <ji...@apache.org> on 2019/12/05 12:02:00 UTC

[jira] [Updated] (AIRFLOW-6179) sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) null value in column "pool" violates not-null constraint

     [ https://issues.apache.org/jira/browse/AIRFLOW-6179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ayush Chauhan updated AIRFLOW-6179:
-----------------------------------
    Description: 
I am trying to run apache airflow in ECS using the {{v1-10-stable}} branch of [apache/airflow|https://github.com/apache/airflow/tree/v1-10-stable] using my fork [airflow|https://github.com/ayush-san/airflow/tree/zmaster]. I am using *CeleryExecutor*. I have updated the python version to 3.7.5. When I trying to run my sample sqoop import dag, I am getting this error on the scheduler container

 
{code:java}
[SQL: UPDATE task_instance SET state=%(state)s, pool=%(pool)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.execution_date = %(task_instance_execution_date)s]
[parameters: {'state': 'scheduled', 'pool': None, 'task_instance_task_id': 'zomatogolddb.red_subscriptions_log.table_cleanup', 'task_instance_dag_id': 'mysql_import_adhoc', 'task_instance_execution_date': datetime.datetime(2019, 12, 5, 11, 34, 33, 758857, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>)}]
(Background on this error at: http://sqlalche.me/e/gkpj)
Process DagFileProcessor18571-Process:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
    cursor.execute(statement, parameters)
psycopg2.IntegrityError: null value in column "pool" violates not-null constraint
DETAIL:  Failing row contains (zomatogolddb.red_subscriptions_log.table_cleanup, mysql_import_adhoc, 2019-12-05 11:34:33.758857+00, null, null, null, scheduled, 0, , airflow, null, null, default, 9, null, null, null, 1, \x80047d942e).


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 148, in _run_file_processor
    result = scheduler_job.process_file(file_path, pickle_dags)
  File "/opt/airflow/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1599, in process_file
    session.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 494, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2470, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2608, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2568, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
    update,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 996, in _emit_update_statements
    statement, multiparams
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) null value in column "pool" violates not-null constraint
DETAIL:  Failing row contains (zomatogolddb.red_subscriptions_log.table_cleanup, mysql_import_adhoc, 2019-12-05 11:34:33.758857+00, null, null, null, scheduled, 0, , airflow, null, null, default, 9, null, null, null, 1, \x80047d942e).

{code}
 

Even though I have passed the pool name for this task and it can be seen in the webserver UI as well. I have attached the webserver UI screenshot for reference. 

 

Previously we were using airflow in *_LocalExecutor_* mode in v1.10.3 without explicitly passing the pool in our operator and we were using MySQL as metastore. 
{code:python}
table_cleanup_task: TableCleanupOperator = TableCleanupOperator(
dag=self.dag,
pool=None,
task_id=table_cleanup_task_id,
trigger_rule=TriggerRule.ALL_DONE,
task_config=task_config,
dag_config=self.dag_config)
{code}
 
 But it was giving me error in v1.10.5 that pool name can't be None, so I have changed it to the following code. Also, I am using Postgres for metastore in this new setup.
{code:python}
table_cleanup_task: TableCleanupOperator = TableCleanupOperator(
dag=self.dag,
pool='default_pool',
task_id=table_cleanup_task_id,
trigger_rule=TriggerRule.ALL_DONE,
task_config=task_config,
dag_config=self.dag_config)
{code}
 
 But still, I am getting the above-mentioned error in the scheduler container

  was:
I am trying to run apache airflow in ECS using the {{v1-10-stable}} branch of [apache/airflow|https://github.com/apache/airflow/tree/v1-10-stable] using my fork [airflow|https://github.com/ayush-san/airflow/tree/zmaster]. I have updated the python version to 3.7.5. When I trying to run my sample sqoop import dag, I am getting this error on the scheduler container

 
{code:java}
[SQL: UPDATE task_instance SET state=%(state)s, pool=%(pool)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.execution_date = %(task_instance_execution_date)s]
[parameters: {'state': 'scheduled', 'pool': None, 'task_instance_task_id': 'zomatogolddb.red_subscriptions_log.table_cleanup', 'task_instance_dag_id': 'mysql_import_adhoc', 'task_instance_execution_date': datetime.datetime(2019, 12, 5, 11, 34, 33, 758857, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>)}]
(Background on this error at: http://sqlalche.me/e/gkpj)
Process DagFileProcessor18571-Process:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
    cursor.execute(statement, parameters)
psycopg2.IntegrityError: null value in column "pool" violates not-null constraint
DETAIL:  Failing row contains (zomatogolddb.red_subscriptions_log.table_cleanup, mysql_import_adhoc, 2019-12-05 11:34:33.758857+00, null, null, null, scheduled, 0, , airflow, null, null, default, 9, null, null, null, 1, \x80047d942e).


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 148, in _run_file_processor
    result = scheduler_job.process_file(file_path, pickle_dags)
  File "/opt/airflow/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1599, in process_file
    session.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 494, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2470, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2608, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2568, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
    update,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 996, in _emit_update_statements
    statement, multiparams
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) null value in column "pool" violates not-null constraint
DETAIL:  Failing row contains (zomatogolddb.red_subscriptions_log.table_cleanup, mysql_import_adhoc, 2019-12-05 11:34:33.758857+00, null, null, null, scheduled, 0, , airflow, null, null, default, 9, null, null, null, 1, \x80047d942e).

{code}
 

Even though I have passed the pool name for this task and it can be seen in the webserver UI as well. I have attached the webserver UI screenshot for reference. 

 

Previously we were using airflow in *_LocalExecutor_* mode in v1.10.3 without explicitly passing the pool in our operator
{code:python}
table_cleanup_task: TableCleanupOperator = TableCleanupOperator(
dag=self.dag,
pool=None,
task_id=table_cleanup_task_id,
trigger_rule=TriggerRule.ALL_DONE,
task_config=task_config,
dag_config=self.dag_config)
{code}
 
 But it was giving me error in v1.10.5 that pool name can't be None, so I have changed it to 
  
{code:python}
table_cleanup_task: TableCleanupOperator = TableCleanupOperator(
dag=self.dag,
pool='default_pool',
task_id=table_cleanup_task_id,
trigger_rule=TriggerRule.ALL_DONE,
task_config=task_config,
dag_config=self.dag_config)
{code}
 
 But still, I am getting the above-mentioned error in the scheduler container


> sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) null value in column "pool" violates not-null constraint
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6179
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6179
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators, scheduler
>    Affects Versions: 1.10.5
>            Reporter: Ayush Chauhan
>            Priority: Major
>         Attachments: task instance detail.png
>
>
> I am trying to run apache airflow in ECS using the {{v1-10-stable}} branch of [apache/airflow|https://github.com/apache/airflow/tree/v1-10-stable] using my fork [airflow|https://github.com/ayush-san/airflow/tree/zmaster]. I am using *CeleryExecutor*. I have updated the python version to 3.7.5. When I trying to run my sample sqoop import dag, I am getting this error on the scheduler container
>  
> {code:java}
> [SQL: UPDATE task_instance SET state=%(state)s, pool=%(pool)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.execution_date = %(task_instance_execution_date)s]
> [parameters: {'state': 'scheduled', 'pool': None, 'task_instance_task_id': 'zomatogolddb.red_subscriptions_log.table_cleanup', 'task_instance_dag_id': 'mysql_import_adhoc', 'task_instance_execution_date': datetime.datetime(2019, 12, 5, 11, 34, 33, 758857, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>)}]
> (Background on this error at: http://sqlalche.me/e/gkpj)
> Process DagFileProcessor18571-Process:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
>     cursor, statement, parameters, context
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
>     cursor.execute(statement, parameters)
> psycopg2.IntegrityError: null value in column "pool" violates not-null constraint
> DETAIL:  Failing row contains (zomatogolddb.red_subscriptions_log.table_cleanup, mysql_import_adhoc, 2019-12-05 11:34:33.758857+00, null, null, null, scheduled, 0, , airflow, null, null, default, 9, null, null, null, 1, \x80047d942e).
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
>     self.run()
>   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
>     self._target(*self._args, **self._kwargs)
>   File "/opt/airflow/airflow/jobs/scheduler_job.py", line 148, in _run_file_processor
>     result = scheduler_job.process_file(file_path, pickle_dags)
>   File "/opt/airflow/airflow/utils/db.py", line 74, in wrapper
>     return func(*args, **kwargs)
>   File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1599, in process_file
>     session.commit()
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
>     self.transaction.commit()
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 494, in commit
>     self._prepare_impl()
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
>     self.session.flush()
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2470, in flush
>     self._flush(objects)
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2608, in _flush
>     transaction.rollback(_capture_exception=True)
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
>     compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
>     raise value
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2568, in _flush
>     flush_context.execute()
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
>     rec.execute(self)
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
>     uow,
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
>     update,
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 996, in _emit_update_statements
>     statement, multiparams
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute
>     return meth(self, multiparams, params)
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
>     return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
>     distilled_params,
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
>     e, statement, parameters, cursor, context
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
>     util.raise_from_cause(sqlalchemy_exception, exc_info)
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
>     raise value.with_traceback(tb)
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
>     cursor, statement, parameters, context
>   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
>     cursor.execute(statement, parameters)
> sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) null value in column "pool" violates not-null constraint
> DETAIL:  Failing row contains (zomatogolddb.red_subscriptions_log.table_cleanup, mysql_import_adhoc, 2019-12-05 11:34:33.758857+00, null, null, null, scheduled, 0, , airflow, null, null, default, 9, null, null, null, 1, \x80047d942e).
> {code}
>  
> Even though I have passed the pool name for this task and it can be seen in the webserver UI as well. I have attached the webserver UI screenshot for reference. 
>  
> Previously we were using airflow in *_LocalExecutor_* mode in v1.10.3 without explicitly passing the pool in our operator and we were using MySQL as metastore. 
> {code:python}
> table_cleanup_task: TableCleanupOperator = TableCleanupOperator(
> dag=self.dag,
> pool=None,
> task_id=table_cleanup_task_id,
> trigger_rule=TriggerRule.ALL_DONE,
> task_config=task_config,
> dag_config=self.dag_config)
> {code}
>  
>  But it was giving me error in v1.10.5 that pool name can't be None, so I have changed it to the following code. Also, I am using Postgres for metastore in this new setup.
> {code:python}
> table_cleanup_task: TableCleanupOperator = TableCleanupOperator(
> dag=self.dag,
> pool='default_pool',
> task_id=table_cleanup_task_id,
> trigger_rule=TriggerRule.ALL_DONE,
> task_config=task_config,
> dag_config=self.dag_config)
> {code}
>  
>  But still, I am getting the above-mentioned error in the scheduler container



--
This message was sent by Atlassian Jira
(v8.3.4#803005)