You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/05/06 09:47:00 UTC

[jira] [Commented] (AIRFLOW-4462) MSSQL backend broken - Airflow manual trigger/scheduled trigger doesn't always work on mssql(azure mssql, mssql 2017, other versions)

    [ https://issues.apache.org/jira/browse/AIRFLOW-4462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833658#comment-16833658 ] 

ASF GitHub Bot commented on AIRFLOW-4462:
-----------------------------------------

deodeveloper commented on pull request #5241: [AIRFLOW-4462] MSSQL backend broken 
URL: https://github.com/apache/airflow/pull/5241
 
 
   - Airflow manual trigger/scheduler…trigger doesn't always work on mssql(azure mssql, mssql 2017, other versions)
   
   * fix mssql server as airflow backend. Applies only to sql server 2005  later versions and azure sql db. Issue is with execution_date used as primary key in task_instance and a equality comparison column in dag_run. When comparing equality python microsecond precision to sql server, it returns false when the millisecond is non-zero,e.g, 2019-04-20T18:51:35.033. This change will make the execution_date from datetime to datetime(precision=6) matches with mysql precision. The date equality comparision works as desired. Microsoft also recommends to use datetime2.
   
   Without this fix, airflow is not usable in sql server and azure sql server db.
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [ ] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
     - https://issues.apache.org/jira/browse/AIRFLOW-XXX
     - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.
     - In case you are proposing a fundamental code change, you need to create an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)).
     - In case you are adding a dependency, check if the license complies with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI changes:
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes how to use it.
     - All the public functions and the classes in the PR contain docstrings that explain what it does
     - If you implement backwards incompatible changes, please leave a note in the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so we can assign it to a appropriate release
   
   ### Code Quality
   
   - [ ] Passes `flake8`
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> MSSQL backend broken - Airflow manual trigger/scheduled trigger doesn't always work on mssql(azure mssql, mssql 2017, other versions)
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-4462
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4462
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun, database, db
>    Affects Versions: 1.9.0, 1.10.1, 1.10.3
>            Reporter: Bijay Deo
>            Assignee: Bijay Deo
>            Priority: Major
>              Labels: patch
>
> Airflow dag trigger doesn't work mssql azure, mssql 2017 (tested), Other version of mssql must have the same issue. Basically, airflow can't be used on mssql without fixing this issue. Just click a number of manual triggers and it will fail.
> Dag trigger woks only when the execution_date is missing the milliseconds part.  when the millisecond part is non-zero upto the last digit, it fails to trigger. 
> The problem is execution_date input having microsecond from pyodbc where it compares equality on task_instance table. Since execution_date is up to millisecond precision  in db(sql server also rounds this value), the equality fails on certain values, e.g, 2019-04-28 00:34:26.517, 2019-04-20T18:51:35.033
> Stack trace below:
> Connected to pydev debugger (build 191.6605.12)
> [2019-04-23 21:49:13,361] \{settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=78750
> [2019-04-23 21:49:13,523] \{__init__.py:51} INFO - Using executor SequentialExecutor
> 2019-04-23 21:49:17,318 INFO sqlalchemy.engine.base.Engine SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)
> [2019-04-23 21:49:17,318] \{log.py:110} INFO - SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)
> 2019-04-23 21:49:17,319 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,319] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,324 INFO sqlalchemy.engine.base.Engine SELECT schema_name()
> [2019-04-23 21:49:17,324] \{log.py:110} INFO - SELECT schema_name()
> 2019-04-23 21:49:17,324 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,324] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,329 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
> [2019-04-23 21:49:17,329] \{log.py:110} INFO - SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
> 2019-04-23 21:49:17,329 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,329] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,332 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS NVARCHAR(60)) AS anon_1
> [2019-04-23 21:49:17,332] \{log.py:110} INFO - SELECT CAST('test unicode returns' AS NVARCHAR(60)) AS anon_1
> 2019-04-23 21:49:17,333 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,333] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,337 INFO sqlalchemy.engine.base.Engine SELECT 1
> [2019-04-23 21:49:17,337] \{log.py:110} INFO - SELECT 1
> 2019-04-23 21:49:17,337 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,337] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,340 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
> [2019-04-23 21:49:17,340] \{log.py:110} INFO - BEGIN (implicit)
> 2019-04-23 21:49:17,344 INFO sqlalchemy.engine.base.Engine INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) OUTPUT inserted.id VALUES (?, ?, ?, ?, ?, ?, ?)
> [2019-04-23 21:49:17,344] \{log.py:110} INFO - INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) OUTPUT inserted.id VALUES (?, ?, ?, ?, ?, ?, ?)
> 2019-04-23 21:49:17,344 INFO sqlalchemy.engine.base.Engine (datetime.datetime(2019, 4, 24, 4, 49, 17, 42277, tzinfo=<Timezone [UTC]>), 'tutorial', 'print_date', 'cli_run', <Pendulum [2019-04-20T18:51:35.033000+00:00]>, 'admin', '\{"host_name": "Bijays-MBP.hsd1.ca.comcast.net", "full_command": "[\'/Users/admin/Documents/Development/Java/airflow/airflow/bin/airflow\', \'run\', \ ... (18 characters truncated) ... dmin/Documents/Development/Java/airflow/airflow/example_dags/tutorial.py\', \'--local\', \'tutorial\', \'print_date\', \'2019-04-20 18:51:35.033\']"}')
> [2019-04-23 21:49:17,344] \{log.py:110} INFO - (datetime.datetime(2019, 4, 24, 4, 49, 17, 42277, tzinfo=<Timezone [UTC]>), 'tutorial', 'print_date', 'cli_run', <Pendulum [2019-04-20T18:51:35.033000+00:00]>, 'admin', '\{"host_name": "Bijays-MBP.hsd1.ca.comcast.net", "full_command": "[\'/Users/admin/Documents/Development/Java/airflow/airflow/bin/airflow\', \'run\', \ ... (18 characters truncated) ... dmin/Documents/Development/Java/airflow/airflow/example_dags/tutorial.py\', \'--local\', \'tutorial\', \'print_date\', \'2019-04-20 18:51:35.033\']"}')
> 2019-04-23 21:49:17,351 INFO sqlalchemy.engine.base.Engine COMMIT
> [2019-04-23 21:49:17,351] \{log.py:110} INFO - COMMIT
> [2019-04-23 21:49:17,366] \{__init__.py:305} INFO - Filling up the DagBag from /Users/admin/Documents/Development/Java/airflow/airflow/example_dags/tutorial.py
> 2019-04-23 21:49:17,508 INFO sqlalchemy.engine.base.Engine SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)
> [2019-04-23 21:49:17,508] \{log.py:110} INFO - SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)
> 2019-04-23 21:49:17,508 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,508] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,510 INFO sqlalchemy.engine.base.Engine SELECT schema_name()
> [2019-04-23 21:49:17,510] \{log.py:110} INFO - SELECT schema_name()
> 2019-04-23 21:49:17,511 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,511] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,515 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
> [2019-04-23 21:49:17,515] \{log.py:110} INFO - SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
> 2019-04-23 21:49:17,515 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,515] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,518 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS NVARCHAR(60)) AS anon_1
> [2019-04-23 21:49:17,518] \{log.py:110} INFO - SELECT CAST('test unicode returns' AS NVARCHAR(60)) AS anon_1
> 2019-04-23 21:49:17,518 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,518] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,523 INFO sqlalchemy.engine.base.Engine SELECT 1
> [2019-04-23 21:49:17,523] \{log.py:110} INFO - SELECT 1
> 2019-04-23 21:49:17,523 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,523] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,526 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
> [2019-04-23 21:49:17,526] \{log.py:110} INFO - BEGIN (implicit)
> 2019-04-23 21:49:17,534 INFO sqlalchemy.engine.base.Engine SELECT TOP 1 task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config 
> FROM task_instance 
> WHERE task_instance.dag_id = ? AND task_instance.task_id = ? AND task_instance.execution_date = ?
> [2019-04-23 21:49:17,534] \{log.py:110} INFO - SELECT TOP 1 task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config 
> FROM task_instance 
> WHERE task_instance.dag_id = ? AND task_instance.task_id = ? AND task_instance.execution_date = ?
> 2019-04-23 21:49:17,536 INFO sqlalchemy.engine.base.Engine ('tutorial', 'print_date', <Pendulum [2019-04-20T18:51:35.033000+00:00]>)
> [2019-04-23 21:49:17,536] \{log.py:110} INFO - ('tutorial', 'print_date', <Pendulum [2019-04-20T18:51:35.033000+00:00]>)
> 2019-04-23 21:49:17,547 INFO sqlalchemy.engine.base.Engine COMMIT
> [2019-04-23 21:49:17,547] \{log.py:110} INFO - COMMIT
> 2019-04-23 21:49:17,681 INFO sqlalchemy.engine.base.Engine SELECT 1
> [2019-04-23 21:49:17,681] \{log.py:110} INFO - SELECT 1
> 2019-04-23 21:49:17,681 INFO sqlalchemy.engine.base.Engine ()
> [2019-04-23 21:49:17,681] \{log.py:110} INFO - ()
> 2019-04-23 21:49:17,684 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
> [2019-04-23 21:49:17,684] \{log.py:110} INFO - BEGIN (implicit)
> 2019-04-23 21:49:17,686 INFO sqlalchemy.engine.base.Engine SELECT TOP 1 dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.conf AS dag_run_conf 
> FROM dag_run 
> WHERE dag_run.dag_id = ? AND dag_run.execution_date = ?
> [2019-04-23 21:49:17,686] \{log.py:110} INFO - SELECT TOP 1 dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.conf AS dag_run_conf 
> FROM dag_run 
> WHERE dag_run.dag_id = ? AND dag_run.execution_date = ?
> 2019-04-23 21:49:17,686 INFO sqlalchemy.engine.base.Engine ('tutorial', <Pendulum [2019-04-20T18:51:35.033000+00:00]>)
> [2019-04-23 21:49:17,686] \{log.py:110} INFO - ('tutorial', <Pendulum [2019-04-20T18:51:35.033000+00:00]>)
> 2019-04-23 21:49:17,691 INFO sqlalchemy.engine.base.Engine COMMIT
> [2019-04-23 21:49:17,691] \{log.py:110} INFO - COMMIT
> [2019-04-23 21:49:17,701] \{cli.py:517} INFO - Running <TaskInstance: tutorial.print_date 2019-04-20T18:51:35.033000+00:00 [None]> on host bijays-mbp.hsd1.ca.comcast.net
> 2019-04-23 21:49:36,110 INFO sqlalchemy.engine.base.Engine SELECT 1
> 2019-04-23 21:49:36,112 INFO sqlalchemy.engine.base.Engine ()
> 2019-04-23 21:49:36,119 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
> 2019-04-23 21:49:36,127 INFO sqlalchemy.engine.base.Engine INSERT INTO job (dag_id, state, job_type, start_date, end_date, latest_heartbeat, executor_class, hostname, unixname) OUTPUT inserted.id VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
> 2019-04-23 21:49:36,127 INFO sqlalchemy.engine.base.Engine ('tutorial', 'running', 'LocalTaskJob', datetime.datetime(2019, 4, 24, 4, 49, 32, 942901, tzinfo=<Timezone [UTC]>), None, datetime.datetime(2019, 4, 24, 4, 49, 32, 943012, tzinfo=<Timezone [UTC]>), 'SequentialExecutor', 'bijays-mbp.hsd1.ca.comcast.net', 'admin')
> 2019-04-23 21:49:36,142 INFO sqlalchemy.engine.base.Engine COMMIT
> 2019-04-23 21:49:59,069 INFO sqlalchemy.engine.base.Engine SELECT 1
> 2019-04-23 21:49:59,070 INFO sqlalchemy.engine.base.Engine ()
> 2019-04-23 21:49:59,072 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
> 2019-04-23 21:49:59,074 INFO sqlalchemy.engine.base.Engine SELECT TOP 1 dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.conf AS dag_run_conf 
> FROM dag_run 
> WHERE dag_run.dag_id = ? AND dag_run.execution_date = ?
> 2019-04-23 21:49:59,074 INFO sqlalchemy.engine.base.Engine ('tutorial', <Pendulum [2019-04-20T18:51:35.033000+00:00]>)
> 2019-04-23 21:49:59,078 INFO sqlalchemy.engine.base.Engine COMMIT
> 2019-04-23 21:50:41,906 INFO sqlalchemy.engine.base.Engine SELECT 1
> 2019-04-23 21:50:41,908 INFO sqlalchemy.engine.base.Engine ()
> 2019-04-23 21:50:41,910 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
> 2019-04-23 21:50:41,912 INFO sqlalchemy.engine.base.Engine SELECT TOP 1 task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config 
> FROM task_instance 
> WHERE task_instance.dag_id = ? AND task_instance.task_id = ? AND task_instance.execution_date = ?
> 2019-04-23 21:50:41,913 INFO sqlalchemy.engine.base.Engine ('tutorial', 'print_date', <Pendulum [2019-04-20T18:51:35.033000+00:00]>)
> 2019-04-23 21:50:41,930 INFO sqlalchemy.engine.base.Engine SELECT task_reschedule.id AS task_reschedule_id, task_reschedule.task_id AS task_reschedule_task_id, task_reschedule.dag_id AS task_reschedule_dag_id, task_reschedule.execution_date AS task_reschedule_execution_date, task_reschedule.try_number AS task_reschedule_try_number, task_reschedule.start_date AS task_reschedule_start_date, task_reschedule.end_date AS task_reschedule_end_date, task_reschedule.duration AS task_reschedule_duration, task_reschedule.reschedule_date AS task_reschedule_reschedule_date 
> FROM task_reschedule 
> WHERE task_reschedule.dag_id = ? AND task_reschedule.task_id = ? AND task_reschedule.execution_date = ? AND task_reschedule.try_number = ? ORDER BY task_reschedule.id ASC
> 2019-04-23 21:50:41,930 INFO sqlalchemy.engine.base.Engine ('tutorial', 'print_date', <Pendulum [2019-04-20T18:51:35.033000+00:00]>, 1)
> 2019-04-23 21:50:41,936 INFO sqlalchemy.engine.base.Engine SELECT count(task_instance.task_id) AS count_1 
> FROM task_instance 
> WHERE task_instance.dag_id = ? AND task_instance.state = ?
> 2019-04-23 21:50:41,937 INFO sqlalchemy.engine.base.Engine ('tutorial', 'running')
> 2019-04-23 21:50:41,941 INFO sqlalchemy.engine.base.Engine COMMIT
> 2019-04-23 21:53:03,611 INFO sqlalchemy.engine.base.Engine SELECT 1
> 2019-04-23 21:53:03,611 INFO sqlalchemy.engine.base.Engine ()
> 2019-04-23 21:53:03,614 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
> 2019-04-23 21:53:03,616 INFO sqlalchemy.engine.base.Engine SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config 
> FROM task_instance 
> WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.execution_date = ?
> 2019-04-23 21:53:03,616 INFO sqlalchemy.engine.base.Engine ('print_date', 'tutorial', <Pendulum [2019-04-20T18:51:35.033000+00:00]>)
> 2019-04-23 21:53:50,370 INFO sqlalchemy.engine.base.Engine INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
> 2019-04-23 21:53:50,372 INFO sqlalchemy.engine.base.Engine ('print_date', 'tutorial', <Pendulum [2019-04-20T18:51:35.033000+00:00]>, datetime.datetime(2019, 4, 24, 4, 50, 41, 924763, tzinfo=<Timezone [UTC]>), None, None, 'running', 1, 1, 'bijays-mbp.hsd1.ca.comcast.net', 'admin', 2046, None, 'default', 3, 'BashOperator', None, 78750, bytearray(b'\x80\x04}\x94.'))
> 2019-04-23 21:53:50,390 INFO sqlalchemy.engine.base.Engine ROLLBACK
> 2019-04-23 21:54:58,467 INFO sqlalchemy.engine.base.Engine SELECT 1
> 2019-04-23 21:54:58,467 INFO sqlalchemy.engine.base.Engine ()
> 2019-04-23 21:54:58,470 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
> 2019-04-23 21:54:58,472 INFO sqlalchemy.engine.base.Engine SELECT job.id AS job_id, job.dag_id AS job_dag_id, job.state AS job_state, job.job_type AS job_job_type, job.start_date AS job_start_date, job.end_date AS job_end_date, job.latest_heartbeat AS job_latest_heartbeat, job.executor_class AS job_executor_class, job.hostname AS job_hostname, job.unixname AS job_unixname 
> FROM job 
> WHERE job.id = ? AND job.job_type IN (?)
> 2019-04-23 21:54:58,473 INFO sqlalchemy.engine.base.Engine (2046, 'LocalTaskJob')
> 2019-04-23 21:54:58,480 INFO sqlalchemy.engine.base.Engine UPDATE job SET state=?, start_date=?, end_date=?, latest_heartbeat=? WHERE job.id = ?
> 2019-04-23 21:54:58,480 INFO sqlalchemy.engine.base.Engine ('failed', datetime.datetime(2019, 4, 24, 4, 49, 32, 942901, tzinfo=<Timezone [UTC]>), datetime.datetime(2019, 4, 24, 4, 53, 50, 392514, tzinfo=<Timezone [UTC]>), datetime.datetime(2019, 4, 24, 4, 49, 32, 943012, tzinfo=<Timezone [UTC]>), 2046)
> 2019-04-23 21:54:58,485 INFO sqlalchemy.engine.base.Engine COMMIT
> Traceback (most recent call last):
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
>  cursor, statement, parameters, context
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 536, in do_execute
>  cursor.execute(statement, parameters)
> pyodbc.IntegrityError: ('23000', "[23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Violation of PRIMARY KEY constraint 'PK__task_ins__9BEABD04B1A321BB'. Cannot insert duplicate key in object 'dbo.task_instance'. The duplicate key value is (print_date, tutorial, Apr 20 2019 6:51PM). (2627) (SQLExecDirectW)")
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
>  File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1741, in <module>
>  main()
>  File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1735, in main
>  globals = debugger.run(setup['file'], None, None, is_module)
>  File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1135, in run
>  pydev_imports.execfile(file, globals, locals) # execute the script
>  File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
>  exec(compile(contents+"\n", file, 'exec'), glob, loc)
>  File "/Users/admin/Documents/Development/Java/airflow/airflow/bin/airflow", line 32, in <module>
>  args.func(args)
>  File "/Users/admin/Documents/Development/Java/airflow/airflow/utils/cli.py", line 74, in wrapper
>  return f(*args, **kwargs)
>  File "/Users/admin/Documents/Development/Java/airflow/airflow/bin/cli.py", line 523, in run
>  _run(args, dag, ti)
>  File "/Users/admin/Documents/Development/Java/airflow/airflow/bin/cli.py", line 437, in _run
>  run_job.run()
>  File "/Users/admin/Documents/Development/Java/airflow/airflow/jobs.py", line 209, in run
>  self._execute()
>  File "/Users/admin/Documents/Development/Java/airflow/airflow/jobs.py", line 2548, in _execute
>  pool=self.pool):
>  File "/Users/admin/Documents/Development/Java/airflow/airflow/utils/db.py", line 73, in wrapper
>  return func(*args, **kwargs)
>  File "/Users/admin/Documents/Development/Java/airflow/airflow/models/__init__.py", line 1365, in _check_and_change_state_before_execution
>  session.commit()
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1023, in commit
>  self.transaction.commit()
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 487, in commit
>  self._prepare_impl()
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 466, in _prepare_impl
>  self.session.flush()
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2446, in flush
>  self._flush(objects)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2584, in _flush
>  transaction.rollback(_capture_exception=True)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 67, in __exit__
>  compat.reraise(exc_type, exc_value, exc_tb)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 277, in reraise
>  raise value
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2544, in _flush
>  flush_context.execute()
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 416, in execute
>  rec.execute(self)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 583, in execute
>  uow,
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
>  insert,
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1063, in _emit_insert_statements
>  c = cached_connections[connection].execute(statement, multiparams)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 980, in execute
>  return meth(self, multiparams, params)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 273, in _execute_on_connection
>  return connection._execute_clauseelement(self, multiparams, params)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1099, in _execute_clauseelement
>  distilled_params,
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context
>  e, statement, parameters, cursor, context
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception
>  util.raise_from_cause(sqlalchemy_exception, exc_info)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause
>  reraise(type(exception), exception, tb=exc_tb, cause=cause)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 276, in reraise
>  raise value.with_traceback(tb)
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
>  cursor, statement, parameters, context
>  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 536, in do_execute
>  cursor.execute(statement, parameters)
> sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Violation of PRIMARY KEY constraint 'PK__task_ins__9BEABD04B1A321BB'. Cannot insert duplicate key in object 'dbo.task_instance'. The duplicate key value is (print_date, tutorial, Apr 20 2019 6:51PM). (2627) (SQLExecDirectW)") [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: ('print_date', 'tutorial', <Pendulum [2019-04-20T18:51:35.033000+00:00]>, datetime.datetime(2019, 4, 24, 4, 50, 41, 924763, tzinfo=<Timezone [UTC]>), None, None, 'running', 1, 1, 'bijays-mbp.hsd1.ca.comcast.net', 'admin', 2046, None, 'default', 3, 'BashOperator', None, 78750, bytearray(b'\x80\x04}\x94.'))] (Background on this error at: http://sqlalche.me/e/gkpj)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)