You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (JIRA)" <ji...@apache.org> on 2019/06/19 13:47:00 UTC

[jira] [Issue Comment Deleted] (AIRFLOW-4355) Externally triggered DAG is marked as 'success' even if a task has been 'removed'!

     [ https://issues.apache.org/jira/browse/AIRFLOW-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ash Berlin-Taylor updated AIRFLOW-4355:
---------------------------------------
    Comment: was deleted

(was: raphaelauv commented on pull request #5275: [AIRFLOW-4355] New tag for only PY3 packages
URL: https://github.com/apache/airflow/pull/5275
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
     - https://issues.apache.org/jira/browse/AIRFLOW-XXX
     - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.
     - In case you are proposing a fundamental code change, you need to create an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)).
     - In case you are adding a dependency, check if the license complies with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI changes:
   
   A new tag , so  we could get ALL PY3 packages ( without PY2 )
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [x] In case of new functionality, my PR adds documentation that describes how to use it.
     - All the public functions and the classes in the PR contain docstrings that explain what it does
     - If you implement backwards incompatible changes, please leave a note in the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so we can assign it to a appropriate release
   
   ### Code Quality
   
   - [x] Passes `flake8`
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
)

> Externally triggered DAG is marked as 'success' even if a task has been 'removed'!
> ----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-4355
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4355
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, DagRun, scheduler
>    Affects Versions: 1.10.3
>            Reporter: t oo
>            Priority: Blocker
>             Fix For: 1.10.4
>
>         Attachments: dag_success_even_if_task_removed.png, treeview.png
>
>
> note: all my dags are purely externally triggered
> *Issue:* Dag has 5 parallel tasks that ran successfully and 1 final task that somehow got 'removed' state (prior dag runs had 'failed' state) and never ran successfully but still the DAG is showing success!
>  
> *Command ran* (note that previous commands like airflow trigger_dag -e 20190412 qsr_coremytbl were run before and failed for valid reason (ie python task failing) ):
> airflow trigger_dag -e 20190412T08:00 qsr_coremytbl --conf '\{"hourstr":"08"}'
>  
> *some logs on prior instance of airflow (ec2 was autohealed):*
> [2019-04-18 08:29:40,678] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:40,678] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [None]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed.
>  [2019-04-18 08:29:43,582] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:43,582] {__init__.py:4906} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
>  [2019-04-18 08:29:43,618] \{jobs.py:1787} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [scheduled]> in ORM
>  [2019-04-18 08:29:43,676] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:43,676] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [scheduled]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed.
>  
> *some logs on newer ec2:*
> [myuser@host logs]$ grep -i hive -R * | sed 's#[0-9]#x#g' | sort | uniq -c | grep -v 'airflow-webserver-access.log'
>  2 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl'), ('format', u'json')]
>  1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl rendered xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl task xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [scheduled]> in ORM
>  71 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
>  1 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [scheduled]> in ORM
>  71 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
>  
> mysql> *select * from task_instance where task_id like '%REP%';#*
>  +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
> |task_id|dag_id|execution_date|start_date|end_date|duration|state|try_number|hostname|unixname|job_id|pool|queue|priority_weight|operator|queued_dttm|pid|max_tries|executor_config|
> +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
> |REPAIR_HIVE_mytbl|qsr_coremytbl|2019-04-13 00:00:00.000000|2019-04-17 20:29:19.639059|2019-04-17 20:29:33.700252|14.0612|failed|1|ip-ec2|myuser|305|NULL|default|1|PythonOperator|2019-04-17 20:29:13.604354|899|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-01 00:00:00.000000|2019-04-17 21:30:11.439127|2019-04-17 21:30:11.439142|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-02 00:00:00.000000|2019-04-17 21:46:34.163607|2019-04-17 21:46:34.163627|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 00:00:00.000000|2019-04-17 21:50:48.541224|2019-04-17 21:50:48.541239|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 05:00:00.000000|2019-04-17 22:00:24.286685|2019-04-17 22:00:24.286709|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 00:00:00.000000|2019-04-17 21:26:08.621737|2019-04-17 21:26:22.686882|14.0651|failed|1|ip-ec2|myuser|316|NULL|default|1|PythonOperator|2019-04-17 21:26:03.083885|29638|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 17:00:00.000000|2019-04-17 22:44:52.900860|2019-04-17 22:45:04.403179|11.5023|failed|1|ip-ec2|myuser|348|NULL|default|1|PythonOperator|2019-04-17 22:44:47.895363|10815|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 22:00:00.000000|2019-04-17 22:37:28.409799|2019-04-17 22:37:41.448494|13.0387|failed|1|ip-ec2|myuser|342|NULL|default|1|PythonOperator|2019-04-17 22:37:23.449554|28697|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 00:00:00.000000|2019-04-17 21:02:36.365150|2019-04-17 21:02:36.365165|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 07:00:00.000000|2019-04-18 08:24:59.552695|2019-04-18 08:24:59.552710|NULL|removed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 08:00:00.000000|NULL|NULL|NULL|removed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
>  11 rows in set (0.00 sec)
>  
>  
> *Task Instance Details*
>  Dependencies Blocking Task From Getting Scheduled
>  Dependency Reason
>  Task Instance State Task is in the 'removed' state which is not a valid state for execution. The task must be cleared in order to be run.
>  Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'success'.
>  Attribute: python_callable
> def repair_hive_table(hive_cli_conn_id, schema, table, drop_partitions_first=False):
> conn = BaseHook.get_connection(hive_cli_conn_id)
>  ssl_options = conn.extra_dejson.get('ssl-options')
>  jdbc_url = "jdbc:hive2://\{conn.host}:\{conn.port}/\{conn.schema};\{ssl_options}".format(**locals())
> sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
>  hive_command = ' '.join([
>  '/home/myuser/spark_home/bin/beeline',
>  '-u', '"%s"' % jdbc_url,
>  '-n', conn.login,
>  '-w', '/home/myuser/spark_home/hivepw',
>  '-e', '"%s"' % sqlstmt ])
>  # note - do not log the command which contains truststore and hive user passwords
>  logging.info("Executing following SQL statement: %s" % sqlstmt)
> process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
>  (output, error) = process.communicate()
>  logging.info(output)
>  if process.returncode != 0:
>  logging.error('Hive process returned code %d' % process.returncode)
>  raise Exception('Hive process returned code %d' % process.returncode)
>  Task Instance Attributes
>  Attribute Value
>  dag_id qsr_coremytbl
>  duration None
>  end_date None
>  execution_date 2019-04-12T08:00:00+00:00
>  executor_config {}
>  generate_command <function generate_command at 0x7f6526f97ed8>
>  hostname 
>  is_premature False
>  job_id None
>  key ('qsr_coremytbl', u'REPAIR_HIVE_schemeh.mytbl', <Pendulum [2019-04-12T08:00:00+00:00]>, 1)
>  log <logging.Logger object at 0x7f65267a2d50>
>  log_filepath /home/myuser/airflow/logs/qsr_coremytbl/REPAIR_HIVE_schemeh.mytbl/2019-04-12T08:00:00+00:00.log
>  log_url [https://domain/admin/airflow/log?dag_id=qsr_coremytbl&task_id=REPAIR_HIVE_schemeh.mytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00]
>  logger <logging.Logger object at 0x7f65267a2d50>
>  mark_success_url [https://domain/admin/airflow/success?task_id=REPAIR_HIVE_schemeh.mytbl&dag_id=qsr_coremytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00&upstream=false&downstream=false]
>  max_tries 0
>  metadata MetaData(bind=None)
>  next_try_number 1
>  operator None
>  pid None
>  pool None
>  previous_ti None
>  priority_weight 1
>  queue default
>  queued_dttm None
>  raw False
>  run_as_user None
>  start_date None
>  state removed
>  task <Task(PythonOperator): REPAIR_HIVE_schemeh.mytbl>
>  task_id REPAIR_HIVE_schemeh.mytbl
>  test_mode False
>  try_number 1
>  unixname myuser
>  Task Attributes
>  Attribute Value
>  dag <DAG: qsr_coremytbl>
>  dag_id qsr_coremytbl
>  depends_on_past False
>  deps set([<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>])
>  downstream_list []
>  downstream_task_ids set([])
>  email None
>  email_on_failure True
>  email_on_retry True
>  end_date None
>  execution_timeout None
>  executor_config {}
>  inlets []
>  lineage_data None
>  log <logging.Logger object at 0x7f65167edad0>
>  logger <logging.Logger object at 0x7f65167edad0>
>  max_retry_delay None
>  on_failure_callback None
>  on_retry_callback None
>  on_success_callback None
>  op_args []
>  op_kwargs \{'table': u'mytbl', 'hive_cli_conn_id': 'query_hive', 'schema': u'schemeh'}
>  outlets []
>  owner airflow
>  params {}
>  pool None
>  priority_weight 1
>  priority_weight_total 1
>  provide_context False
>  queue default
>  resources {'disk':
> {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}
> , 'gpus': \{'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': \{'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': \{'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
>  retries 0
>  retry_delay 0:05:00
>  retry_exponential_backoff False
>  run_as_user None
>  schedule_interval None
>  shallow_copy_attrs ('python_callable', 'op_kwargs')
>  sla None
>  start_date 2017-06-01T00:00:00+00:00
>  task_concurrency None
>  task_id REPAIR_HIVE_schemeh.mytbl
>  task_type PythonOperator
>  template_ext []
>  template_fields ('templates_dict', 'op_args', 'op_kwargs')
>  templates_dict None
>  trigger_rule all_success
>  ui_color #ffefeb
>  ui_fgcolor #000
>  upstream_list [<Task(SparkSubmitOperator): coremytbl_FR__H>, <Task(SparkSubmitOperator): coremytbl_BE__H>, <Task(SparkSubmitOperator): coremytbl_NL__H>, <Task(SparkSubmitOperator): coremytbl_DE__H>, <Task(SparkSubmitOperator): coremytbl_DAGNAME__H>]
>  upstream_task_ids set([u'coremytbl_FR__H', u'coremytbl_BE__H', u'coremytbl_NL__H', u'coremytbl_DE__H', u'coremytbl_DAGNAME__H'])
>  wait_for_downstream False
>  weight_rule downstream
>  
> DAG code:
> {noformat}
>  
> import datetime as dt
> import glob
> import json
> import logging
> import os
> import subprocess
> #import urllib
> from airflow import DAG
> from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.hooks.base_hook import BaseHook
> # Globals
> DATALAKE_S3ROOT = subprocess.check_output(". /etc/pipeline/profile; dig -t TXT +short qsrs3.$pipelineBranch.sss.$networkDomainName | tail -1 | tr -d '\"' | sed -e s#^s3://##", shell=True).strip()
> default_args = {
>     'owner': 'airflow',
>     'start_date': dt.datetime(2017, 6, 1),
>     'retries': 0,
>     'retry_delay': dt.timedelta(minutes=5),
> }
> def repair_hive_table(hive_cli_conn_id, schema, table, drop_partitions_first=False):
>     
>     conn = BaseHook.get_connection(hive_cli_conn_id)
>     ssl_options = conn.extra_dejson.get('ssl-options')
>     jdbc_url = "jdbc:hive2://{conn.host}:{conn.port}/{conn.schema};{ssl_options}".format(**locals())
>     
>     sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
>     hive_command = ' '.join([
>     '/home/myuser/spark_home/bin/beeline',
>     '-u', '"%s"' % jdbc_url,
>     '-n', conn.login,
>     '-w', '/home/myuser/spark_home/hivepw',
>     '-e', '"%s"' % sqlstmt ])
>     # note - do not log the command which contains truststore and hive user passwords
>     logging.info("Executing following SQL statement: %s" % sqlstmt)
>     
>     process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
>     (output, error) = process.communicate()
>     logging.info(output)
>     if process.returncode != 0:
>         logging.error('Hive process returned code %d' % process.returncode)
>         raise Exception('Hive process returned code %d' % process.returncode)
>         
>         
>     
> def create_spark_submit_operator(dag, pool, runstream, iz_strip_name, table, filename):
>     task_id = "%s_%s" % (runstream, iz_strip_name)
>     #need to handle validation and import
>     filename = iz_strip_name + '_input.yaml'
>     file_path = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 'jar/input', filename)
>     return SparkSubmitOperator(
>             dag = dag, # need to tell airflow that this task belongs to the dag we defined above
>             task_id = task_id,
>             pool = pool,
>             #params={"lob": lob}, # the sql file above have a template in it for a 'lob' paramater - this is how we pass it in
>             #bash_command='echo "Data Load task 1 {{ params.lob }} here"'
>             conn_id='process',
>             java_class='com.cimp.jar.jar',
>             application='s3a://%s/jar.jar' % DATALAKE_S3ROOT,
>             #total_executor_cores='16',
>             executor_cores='2',
>             executor_memory='8g',
>             driver_memory='2g',
>             conf={"spark.sql.parquet.writeLegacyFormat" : "true"
>             ,"spark.executor.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }}"
>             ,"spark.driver.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }}"
>             },
>             #num_executors='2',
>             name="%s_{{ ds_nodash }}" % task_id,
>             verbose=True,
>             #dont know how will work (where do the prior exports get made) or if commandline way can be used
>             #env_vars={"input_wildcard": os.environ['input_wildcard'],"datestr": os.environ['datestr'],"hourstr": os.environ['hourstr']},
>             application_args=[
>             "--config", "%s" % file_path
>             ],
>         )
> def create_load_dag(dag_id, runstream):
>     dag = DAG(dag_id, # give the dag a name 
>            schedule_interval=None, # define how often you want it to run - you can pass cron expressions here
>            default_args=default_args # pass the default args defined above or you can override them here if you want this dag to behave a little different
>          )
>     
>     file_list = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 'file_list.json')
>     process = subprocess.Popen(["cat", file_list], stdout=subprocess.PIPE)
>     decoded_data = json.load(process.stdout)
>     
>     table_repair_dependencies = {} # (schema, table) -> [load tasks]
>     for file in decoded_data['files']:
>         srcfile = file['name']
>         iz_strip_name = file['iz_strip_name']
>         
>         schema = file['Schema']
>         hive_table = file['hive_table']
>         # One staging task
>         stagingTask = create_spark_submit_operator(dag, None, runstream, iz_strip_name, srcfile, "QSRLOAD")
>         stagingTask.doc_md = """\
>         QSR jar LOAD for %s to %s.%s
>         """ % (srcfile,schema,hive_table)
>         try :
>             table_repair_dependencies[(schema, hive_table)].append(stagingTask)
>         except KeyError:
>             table_repair_dependencies[(schema, hive_table)] = [stagingTask]
>             
>             
>     # table repair tasks
>     for (schema, object_name) in table_repair_dependencies.keys() :
>         repairHiveTask = PythonOperator(dag = dag, task_id="REPAIR_HIVE_%s.%s" %  (schema,object_name),
>                      python_callable=repair_hive_table,
>                      op_kwargs={'hive_cli_conn_id' : 'query_hive',
>                      'schema' : schema,
>                      'table' : object_name})
>         repairHiveTask << table_repair_dependencies[(schema, object_name)]
>         
>     return dag
>     
>         
> # dynamically generate all dags
> for entry in os.listdir('/home/myuser/jsonconfigs/si/job'):
>     file_list = glob.glob(os.path.join('/home/myuser/jsonconfigs/si/job', entry, 'file_list.json'))
>     if file_list :    
>         runstream = entry
>         dag_id = 'qsr_%s' % runstream
>         globals()[dag_id] = create_load_dag(dag_id, runstream)
> {noformat}
>  
> *after encountering this issue i ran*: airflow clear mycooldag
> below was the output....as you can see the REPAIR_HIVE task was never successful so I don't know how the dag can be overall 'success' state!!!
>  
>  
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-01 00:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-02 00:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 00:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 05:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 00:00:00+00:00 [failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 17:00:00+00:00 [failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 22:00:00+00:00 [failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 00:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 07:00:00+00:00 [removed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)