You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "t oo (JIRA)" <ji...@apache.org> on 2019/05/10 22:24:00 UTC

[jira] [Updated] (AIRFLOW-4355) DAG is marked as 'success' even if a task has been 'removed'!

     [ https://issues.apache.org/jira/browse/AIRFLOW-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

t oo updated AIRFLOW-4355:
--------------------------
    Description: 
note: all my dags are purely externally triggered

*Issue:* Dag has 5 parallel tasks that ran successfully and 1 final task that somehow got 'removed' state (prior dag runs had 'failed' state) and never ran successfully but still the DAG is showing success!

 

*Command ran* (note that previous commands like airflow trigger_dag -e 20190412 qsr_coremytbl were run before and failed for valid reason (ie python task failing) ):

airflow trigger_dag -e 20190412T08:00 qsr_coremytbl --conf '\{"hourstr":"08"}'

 

*some logs on prior instance of airflow (ec2 was autohealed):*

[2019-04-18 08:29:40,678] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:40,678] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [None]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed.
 [2019-04-18 08:29:43,582] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:43,582] {__init__.py:4906} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
 [2019-04-18 08:29:43,618] \{jobs.py:1787} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [scheduled]> in ORM
 [2019-04-18 08:29:43,676] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:43,676] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [scheduled]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed.

 

*some logs on newer ec2:*

[myuser@host logs]$ grep -i hive -R * | sed 's#[0-9]#x#g' | sort | uniq -c | grep -v 'airflow-webserver-access.log'
 2 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
 1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl'), ('format', u'json')]
 1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl rendered xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
 1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl task xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
 1 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [scheduled]> in ORM
 71 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
 1 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [scheduled]> in ORM
 71 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'

 

mysql> *select * from task_instance where task_id like '%REP%';#*
 +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
|task_id|dag_id|execution_date|start_date|end_date|duration|state|try_number|hostname|unixname|job_id|pool|queue|priority_weight|operator|queued_dttm|pid|max_tries|executor_config|

+------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
|REPAIR_HIVE_mytbl|qsr_coremytbl|2019-04-13 00:00:00.000000|2019-04-17 20:29:19.639059|2019-04-17 20:29:33.700252|14.0612|failed|1|ip-ec2|myuser|305|NULL|default|1|PythonOperator|2019-04-17 20:29:13.604354|899|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-01 00:00:00.000000|2019-04-17 21:30:11.439127|2019-04-17 21:30:11.439142|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-02 00:00:00.000000|2019-04-17 21:46:34.163607|2019-04-17 21:46:34.163627|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 00:00:00.000000|2019-04-17 21:50:48.541224|2019-04-17 21:50:48.541239|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 05:00:00.000000|2019-04-17 22:00:24.286685|2019-04-17 22:00:24.286709|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 00:00:00.000000|2019-04-17 21:26:08.621737|2019-04-17 21:26:22.686882|14.0651|failed|1|ip-ec2|myuser|316|NULL|default|1|PythonOperator|2019-04-17 21:26:03.083885|29638|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 17:00:00.000000|2019-04-17 22:44:52.900860|2019-04-17 22:45:04.403179|11.5023|failed|1|ip-ec2|myuser|348|NULL|default|1|PythonOperator|2019-04-17 22:44:47.895363|10815|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 22:00:00.000000|2019-04-17 22:37:28.409799|2019-04-17 22:37:41.448494|13.0387|failed|1|ip-ec2|myuser|342|NULL|default|1|PythonOperator|2019-04-17 22:37:23.449554|28697|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 00:00:00.000000|2019-04-17 21:02:36.365150|2019-04-17 21:02:36.365165|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 07:00:00.000000|2019-04-18 08:24:59.552695|2019-04-18 08:24:59.552710|NULL|removed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 08:00:00.000000|NULL|NULL|NULL|removed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|

+------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
 11 rows in set (0.00 sec)

 

 

*Task Instance Details*
 Dependencies Blocking Task From Getting Scheduled
 Dependency Reason
 Task Instance State Task is in the 'removed' state which is not a valid state for execution. The task must be cleared in order to be run.
 Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'success'.
 Attribute: python_callable

def repair_hive_table(hive_cli_conn_id, schema, table, drop_partitions_first=False):

conn = BaseHook.get_connection(hive_cli_conn_id)
 ssl_options = conn.extra_dejson.get('ssl-options')
 jdbc_url = "jdbc:hive2://\{conn.host}:\{conn.port}/\{conn.schema};\{ssl_options}".format(**locals())

sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
 hive_command = ' '.join([
 '/home/myuser/spark_home/bin/beeline',
 '-u', '"%s"' % jdbc_url,
 '-n', conn.login,
 '-w', '/home/myuser/spark_home/hivepw',
 '-e', '"%s"' % sqlstmt ])
 # note - do not log the command which contains truststore and hive user passwords
 logging.info("Executing following SQL statement: %s" % sqlstmt)

process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
 (output, error) = process.communicate()
 logging.info(output)
 if process.returncode != 0:
 logging.error('Hive process returned code %d' % process.returncode)
 raise Exception('Hive process returned code %d' % process.returncode)
 Task Instance Attributes
 Attribute Value
 dag_id qsr_coremytbl
 duration None
 end_date None
 execution_date 2019-04-12T08:00:00+00:00
 executor_config {}
 generate_command <function generate_command at 0x7f6526f97ed8>
 hostname 
 is_premature False
 job_id None
 key ('qsr_coremytbl', u'REPAIR_HIVE_schemeh.mytbl', <Pendulum [2019-04-12T08:00:00+00:00]>, 1)
 log <logging.Logger object at 0x7f65267a2d50>
 log_filepath /home/myuser/airflow/logs/qsr_coremytbl/REPAIR_HIVE_schemeh.mytbl/2019-04-12T08:00:00+00:00.log
 log_url [https://domain/admin/airflow/log?dag_id=qsr_coremytbl&task_id=REPAIR_HIVE_schemeh.mytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00]
 logger <logging.Logger object at 0x7f65267a2d50>
 mark_success_url [https://domain/admin/airflow/success?task_id=REPAIR_HIVE_schemeh.mytbl&dag_id=qsr_coremytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00&upstream=false&downstream=false]
 max_tries 0
 metadata MetaData(bind=None)
 next_try_number 1
 operator None
 pid None
 pool None
 previous_ti None
 priority_weight 1
 queue default
 queued_dttm None
 raw False
 run_as_user None
 start_date None
 state removed
 task <Task(PythonOperator): REPAIR_HIVE_schemeh.mytbl>
 task_id REPAIR_HIVE_schemeh.mytbl
 test_mode False
 try_number 1
 unixname myuser
 Task Attributes
 Attribute Value
 dag <DAG: qsr_coremytbl>
 dag_id qsr_coremytbl
 depends_on_past False
 deps set([<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>])
 downstream_list []
 downstream_task_ids set([])
 email None
 email_on_failure True
 email_on_retry True
 end_date None
 execution_timeout None
 executor_config {}
 inlets []
 lineage_data None
 log <logging.Logger object at 0x7f65167edad0>
 logger <logging.Logger object at 0x7f65167edad0>
 max_retry_delay None
 on_failure_callback None
 on_retry_callback None
 on_success_callback None
 op_args []
 op_kwargs \{'table': u'mytbl', 'hive_cli_conn_id': 'query_hive', 'schema': u'schemeh'}
 outlets []
 owner airflow
 params {}
 pool None
 priority_weight 1
 priority_weight_total 1
 provide_context False
 queue default
 resources {'disk':

{'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}

, 'gpus': \{'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': \{'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': \{'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
 retries 0
 retry_delay 0:05:00
 retry_exponential_backoff False
 run_as_user None
 schedule_interval None
 shallow_copy_attrs ('python_callable', 'op_kwargs')
 sla None
 start_date 2017-06-01T00:00:00+00:00
 task_concurrency None
 task_id REPAIR_HIVE_schemeh.mytbl
 task_type PythonOperator
 template_ext []
 template_fields ('templates_dict', 'op_args', 'op_kwargs')
 templates_dict None
 trigger_rule all_success
 ui_color #ffefeb
 ui_fgcolor #000
 upstream_list [<Task(SparkSubmitOperator): coremytbl_FR__H>, <Task(SparkSubmitOperator): coremytbl_BE__H>, <Task(SparkSubmitOperator): coremytbl_NL__H>, <Task(SparkSubmitOperator): coremytbl_DE__H>, <Task(SparkSubmitOperator): coremytbl_DAGNAME__H>]
 upstream_task_ids set([u'coremytbl_FR__H', u'coremytbl_BE__H', u'coremytbl_NL__H', u'coremytbl_DE__H', u'coremytbl_DAGNAME__H'])
 wait_for_downstream False
 weight_rule downstream

 

DAG code:
{noformat}
 
import datetime as dt
import glob
import json
import logging
import os
import subprocess
#import urllib

from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook

# Globals
DATALAKE_S3ROOT = subprocess.check_output(". /etc/pipeline/profile; dig -t TXT +short qsrs3.$pipelineBranch.sss.$networkDomainName | tail -1 | tr -d '\"' | sed -e s#^s3://##", shell=True).strip()

default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2017, 6, 1),
    'retries': 0,
    'retry_delay': dt.timedelta(minutes=5),
}

def repair_hive_table(hive_cli_conn_id, schema, table, drop_partitions_first=False):
    
    conn = BaseHook.get_connection(hive_cli_conn_id)
    ssl_options = conn.extra_dejson.get('ssl-options')
    jdbc_url = "jdbc:hive2://{conn.host}:{conn.port}/{conn.schema};{ssl_options}".format(**locals())
    
    sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
    hive_command = ' '.join([
    '/home/myuser/spark_home/bin/beeline',
    '-u', '"%s"' % jdbc_url,
    '-n', conn.login,
    '-w', '/home/myuser/spark_home/hivepw',
    '-e', '"%s"' % sqlstmt ])
    # note - do not log the command which contains truststore and hive user passwords
    logging.info("Executing following SQL statement: %s" % sqlstmt)
    
    process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
    (output, error) = process.communicate()
    logging.info(output)
    if process.returncode != 0:
        logging.error('Hive process returned code %d' % process.returncode)
        raise Exception('Hive process returned code %d' % process.returncode)
        
        
    
def create_spark_submit_operator(dag, pool, runstream, iz_strip_name, table, filename):
    task_id = "%s_%s" % (runstream, iz_strip_name)
    #need to handle validation and import
    filename = iz_strip_name + '_input.yaml'
    file_path = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 'jar/input', filename)
    return SparkSubmitOperator(
            dag = dag, # need to tell airflow that this task belongs to the dag we defined above
            task_id = task_id,
            pool = pool,
            #params={"lob": lob}, # the sql file above have a template in it for a 'lob' paramater - this is how we pass it in
            #bash_command='echo "Data Load task 1 {{ params.lob }} here"'
            conn_id='process',
            java_class='com.cimp.jar.jar',
            application='s3a://%s/jar.jar' % DATALAKE_S3ROOT,
            #total_executor_cores='16',
            executor_cores='2',
            executor_memory='8g',
            driver_memory='2g',
            conf={"spark.sql.parquet.writeLegacyFormat" : "true"
            ,"spark.executor.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }}"
            ,"spark.driver.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }}"
            },
            #num_executors='2',
            name="%s_{{ ds_nodash }}" % task_id,
            verbose=True,
            #dont know how will work (where do the prior exports get made) or if commandline way can be used
            #env_vars={"input_wildcard": os.environ['input_wildcard'],"datestr": os.environ['datestr'],"hourstr": os.environ['hourstr']},
            application_args=[
            "--config", "%s" % file_path
            ],
        )

def create_load_dag(dag_id, runstream):
    dag = DAG(dag_id, # give the dag a name 
           schedule_interval=None, # define how often you want it to run - you can pass cron expressions here
           default_args=default_args # pass the default args defined above or you can override them here if you want this dag to behave a little different
         )
    

    file_list = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 'file_list.json')
    process = subprocess.Popen(["cat", file_list], stdout=subprocess.PIPE)
    decoded_data = json.load(process.stdout)
    
    table_repair_dependencies = {} # (schema, table) -> [load tasks]
    for file in decoded_data['files']:
        srcfile = file['name']
        iz_strip_name = file['iz_strip_name']
        
        schema = file['Schema']
        hive_table = file['hive_table']
        # One staging task
        stagingTask = create_spark_submit_operator(dag, None, runstream, iz_strip_name, srcfile, "QSRLOAD")

        stagingTask.doc_md = """\
        QSR jar LOAD for %s to %s.%s
        """ % (srcfile,schema,hive_table)

        try :
            table_repair_dependencies[(schema, hive_table)].append(stagingTask)
        except KeyError:
            table_repair_dependencies[(schema, hive_table)] = [stagingTask]
            
            
    # table repair tasks
    for (schema, object_name) in table_repair_dependencies.keys() :
        repairHiveTask = PythonOperator(dag = dag, task_id="REPAIR_HIVE_%s.%s" %  (schema,object_name),
                     python_callable=repair_hive_table,
                     op_kwargs={'hive_cli_conn_id' : 'query_hive',
                     'schema' : schema,
                     'table' : object_name})
        repairHiveTask << table_repair_dependencies[(schema, object_name)]
        
    return dag

    
        
# dynamically generate all dags
for entry in os.listdir('/home/myuser/jsonconfigs/si/job'):
    file_list = glob.glob(os.path.join('/home/myuser/jsonconfigs/si/job', entry, 'file_list.json'))
    if file_list :    
        runstream = entry
        dag_id = 'qsr_%s' % runstream
        globals()[dag_id] = create_load_dag(dag_id, runstream)
{noformat}
 

*after encountering this issue i ran*: airflow clear mycooldag

below was the output....as you can see the REPAIR_HIVE task was never successful so I don't know how the dag can be overall 'success' state!!!

 

 

<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-01 00:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-02 00:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 00:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 05:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 00:00:00+00:00 [failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 17:00:00+00:00 [failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 22:00:00+00:00 [failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 00:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 07:00:00+00:00 [removed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>

  was:
*Issue:* Dag has 5 parallel tasks that ran successfully and 1 final task that somehow got 'removed' state (prior dag runs had 'failed' state) and never ran successfully but still the DAG is showing success!

 

*Command ran* (note that previous commands like airflow trigger_dag -e 20190412 qsr_coremytbl were run before and failed for valid reason (ie python task failing) ):

airflow trigger_dag -e 20190412T08:00 qsr_coremytbl --conf '\{"hourstr":"08"}'

 

*some logs on prior instance of airflow (ec2 was autohealed):*

[2019-04-18 08:29:40,678] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:40,678] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [None]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed.
 [2019-04-18 08:29:43,582] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:43,582] {__init__.py:4906} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
 [2019-04-18 08:29:43,618] \{jobs.py:1787} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [scheduled]> in ORM
 [2019-04-18 08:29:43,676] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:43,676] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [scheduled]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed.

 

*some logs on newer ec2:*

[myuser@host logs]$ grep -i hive -R * | sed 's#[0-9]#x#g' | sort | uniq -c | grep -v 'airflow-webserver-access.log'
 2 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
 1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl'), ('format', u'json')]
 1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl rendered xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
 1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl task xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
 1 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [scheduled]> in ORM
 71 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
 1 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [scheduled]> in ORM
 71 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'

 

mysql> *select * from task_instance where task_id like '%REP%';#*
 +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
|task_id|dag_id|execution_date|start_date|end_date|duration|state|try_number|hostname|unixname|job_id|pool|queue|priority_weight|operator|queued_dttm|pid|max_tries|executor_config|

+------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
|REPAIR_HIVE_mytbl|qsr_coremytbl|2019-04-13 00:00:00.000000|2019-04-17 20:29:19.639059|2019-04-17 20:29:33.700252|14.0612|failed|1|ip-ec2|myuser|305|NULL|default|1|PythonOperator|2019-04-17 20:29:13.604354|899|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-01 00:00:00.000000|2019-04-17 21:30:11.439127|2019-04-17 21:30:11.439142|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-02 00:00:00.000000|2019-04-17 21:46:34.163607|2019-04-17 21:46:34.163627|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 00:00:00.000000|2019-04-17 21:50:48.541224|2019-04-17 21:50:48.541239|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 05:00:00.000000|2019-04-17 22:00:24.286685|2019-04-17 22:00:24.286709|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 00:00:00.000000|2019-04-17 21:26:08.621737|2019-04-17 21:26:22.686882|14.0651|failed|1|ip-ec2|myuser|316|NULL|default|1|PythonOperator|2019-04-17 21:26:03.083885|29638|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 17:00:00.000000|2019-04-17 22:44:52.900860|2019-04-17 22:45:04.403179|11.5023|failed|1|ip-ec2|myuser|348|NULL|default|1|PythonOperator|2019-04-17 22:44:47.895363|10815|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 22:00:00.000000|2019-04-17 22:37:28.409799|2019-04-17 22:37:41.448494|13.0387|failed|1|ip-ec2|myuser|342|NULL|default|1|PythonOperator|2019-04-17 22:37:23.449554|28697|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 00:00:00.000000|2019-04-17 21:02:36.365150|2019-04-17 21:02:36.365165|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 07:00:00.000000|2019-04-18 08:24:59.552695|2019-04-18 08:24:59.552710|NULL|removed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
|REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 08:00:00.000000|NULL|NULL|NULL|removed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|

+------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
 11 rows in set (0.00 sec)

 

 

*Task Instance Details*
 Dependencies Blocking Task From Getting Scheduled
 Dependency Reason
 Task Instance State Task is in the 'removed' state which is not a valid state for execution. The task must be cleared in order to be run.
 Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'success'.
 Attribute: python_callable

def repair_hive_table(hive_cli_conn_id, schema, table, drop_partitions_first=False):

conn = BaseHook.get_connection(hive_cli_conn_id)
 ssl_options = conn.extra_dejson.get('ssl-options')
 jdbc_url = "jdbc:hive2://\{conn.host}:\{conn.port}/\{conn.schema};\{ssl_options}".format(**locals())

sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
 hive_command = ' '.join([
 '/home/myuser/spark_home/bin/beeline',
 '-u', '"%s"' % jdbc_url,
 '-n', conn.login,
 '-w', '/home/myuser/spark_home/hivepw',
 '-e', '"%s"' % sqlstmt ])
 # note - do not log the command which contains truststore and hive user passwords
 logging.info("Executing following SQL statement: %s" % sqlstmt)

process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
 (output, error) = process.communicate()
 logging.info(output)
 if process.returncode != 0:
 logging.error('Hive process returned code %d' % process.returncode)
 raise Exception('Hive process returned code %d' % process.returncode)
 Task Instance Attributes
 Attribute Value
 dag_id qsr_coremytbl
 duration None
 end_date None
 execution_date 2019-04-12T08:00:00+00:00
 executor_config {}
 generate_command <function generate_command at 0x7f6526f97ed8>
 hostname 
 is_premature False
 job_id None
 key ('qsr_coremytbl', u'REPAIR_HIVE_schemeh.mytbl', <Pendulum [2019-04-12T08:00:00+00:00]>, 1)
 log <logging.Logger object at 0x7f65267a2d50>
 log_filepath /home/myuser/airflow/logs/qsr_coremytbl/REPAIR_HIVE_schemeh.mytbl/2019-04-12T08:00:00+00:00.log
 log_url [https://domain/admin/airflow/log?dag_id=qsr_coremytbl&task_id=REPAIR_HIVE_schemeh.mytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00]
 logger <logging.Logger object at 0x7f65267a2d50>
 mark_success_url [https://domain/admin/airflow/success?task_id=REPAIR_HIVE_schemeh.mytbl&dag_id=qsr_coremytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00&upstream=false&downstream=false]
 max_tries 0
 metadata MetaData(bind=None)
 next_try_number 1
 operator None
 pid None
 pool None
 previous_ti None
 priority_weight 1
 queue default
 queued_dttm None
 raw False
 run_as_user None
 start_date None
 state removed
 task <Task(PythonOperator): REPAIR_HIVE_schemeh.mytbl>
 task_id REPAIR_HIVE_schemeh.mytbl
 test_mode False
 try_number 1
 unixname myuser
 Task Attributes
 Attribute Value
 dag <DAG: qsr_coremytbl>
 dag_id qsr_coremytbl
 depends_on_past False
 deps set([<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>])
 downstream_list []
 downstream_task_ids set([])
 email None
 email_on_failure True
 email_on_retry True
 end_date None
 execution_timeout None
 executor_config {}
 inlets []
 lineage_data None
 log <logging.Logger object at 0x7f65167edad0>
 logger <logging.Logger object at 0x7f65167edad0>
 max_retry_delay None
 on_failure_callback None
 on_retry_callback None
 on_success_callback None
 op_args []
 op_kwargs \{'table': u'mytbl', 'hive_cli_conn_id': 'query_hive', 'schema': u'schemeh'}
 outlets []
 owner airflow
 params {}
 pool None
 priority_weight 1
 priority_weight_total 1
 provide_context False
 queue default
 resources {'disk':

{'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}

, 'gpus': \{'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': \{'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': \{'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
 retries 0
 retry_delay 0:05:00
 retry_exponential_backoff False
 run_as_user None
 schedule_interval None
 shallow_copy_attrs ('python_callable', 'op_kwargs')
 sla None
 start_date 2017-06-01T00:00:00+00:00
 task_concurrency None
 task_id REPAIR_HIVE_schemeh.mytbl
 task_type PythonOperator
 template_ext []
 template_fields ('templates_dict', 'op_args', 'op_kwargs')
 templates_dict None
 trigger_rule all_success
 ui_color #ffefeb
 ui_fgcolor #000
 upstream_list [<Task(SparkSubmitOperator): coremytbl_FR__H>, <Task(SparkSubmitOperator): coremytbl_BE__H>, <Task(SparkSubmitOperator): coremytbl_NL__H>, <Task(SparkSubmitOperator): coremytbl_DE__H>, <Task(SparkSubmitOperator): coremytbl_DAGNAME__H>]
 upstream_task_ids set([u'coremytbl_FR__H', u'coremytbl_BE__H', u'coremytbl_NL__H', u'coremytbl_DE__H', u'coremytbl_DAGNAME__H'])
 wait_for_downstream False
 weight_rule downstream

 

DAG code:
{noformat}
 
import datetime as dt
import glob
import json
import logging
import os
import subprocess
#import urllib

from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook

# Globals
DATALAKE_S3ROOT = subprocess.check_output(". /etc/pipeline/profile; dig -t TXT +short qsrs3.$pipelineBranch.sss.$networkDomainName | tail -1 | tr -d '\"' | sed -e s#^s3://##", shell=True).strip()

default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2017, 6, 1),
    'retries': 0,
    'retry_delay': dt.timedelta(minutes=5),
}

def repair_hive_table(hive_cli_conn_id, schema, table, drop_partitions_first=False):
    
    conn = BaseHook.get_connection(hive_cli_conn_id)
    ssl_options = conn.extra_dejson.get('ssl-options')
    jdbc_url = "jdbc:hive2://{conn.host}:{conn.port}/{conn.schema};{ssl_options}".format(**locals())
    
    sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
    hive_command = ' '.join([
    '/home/myuser/spark_home/bin/beeline',
    '-u', '"%s"' % jdbc_url,
    '-n', conn.login,
    '-w', '/home/myuser/spark_home/hivepw',
    '-e', '"%s"' % sqlstmt ])
    # note - do not log the command which contains truststore and hive user passwords
    logging.info("Executing following SQL statement: %s" % sqlstmt)
    
    process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
    (output, error) = process.communicate()
    logging.info(output)
    if process.returncode != 0:
        logging.error('Hive process returned code %d' % process.returncode)
        raise Exception('Hive process returned code %d' % process.returncode)
        
        
    
def create_spark_submit_operator(dag, pool, runstream, iz_strip_name, table, filename):
    task_id = "%s_%s" % (runstream, iz_strip_name)
    #need to handle validation and import
    filename = iz_strip_name + '_input.yaml'
    file_path = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 'jar/input', filename)
    return SparkSubmitOperator(
            dag = dag, # need to tell airflow that this task belongs to the dag we defined above
            task_id = task_id,
            pool = pool,
            #params={"lob": lob}, # the sql file above have a template in it for a 'lob' paramater - this is how we pass it in
            #bash_command='echo "Data Load task 1 {{ params.lob }} here"'
            conn_id='process',
            java_class='com.cimp.jar.jar',
            application='s3a://%s/jar.jar' % DATALAKE_S3ROOT,
            #total_executor_cores='16',
            executor_cores='2',
            executor_memory='8g',
            driver_memory='2g',
            conf={"spark.sql.parquet.writeLegacyFormat" : "true"
            ,"spark.executor.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }}"
            ,"spark.driver.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }}"
            },
            #num_executors='2',
            name="%s_{{ ds_nodash }}" % task_id,
            verbose=True,
            #dont know how will work (where do the prior exports get made) or if commandline way can be used
            #env_vars={"input_wildcard": os.environ['input_wildcard'],"datestr": os.environ['datestr'],"hourstr": os.environ['hourstr']},
            application_args=[
            "--config", "%s" % file_path
            ],
        )

def create_load_dag(dag_id, runstream):
    dag = DAG(dag_id, # give the dag a name 
           schedule_interval=None, # define how often you want it to run - you can pass cron expressions here
           default_args=default_args # pass the default args defined above or you can override them here if you want this dag to behave a little different
         )
    

    file_list = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 'file_list.json')
    process = subprocess.Popen(["cat", file_list], stdout=subprocess.PIPE)
    decoded_data = json.load(process.stdout)
    
    table_repair_dependencies = {} # (schema, table) -> [load tasks]
    for file in decoded_data['files']:
        srcfile = file['name']
        iz_strip_name = file['iz_strip_name']
        
        schema = file['Schema']
        hive_table = file['hive_table']
        # One staging task
        stagingTask = create_spark_submit_operator(dag, None, runstream, iz_strip_name, srcfile, "QSRLOAD")

        stagingTask.doc_md = """\
        QSR jar LOAD for %s to %s.%s
        """ % (srcfile,schema,hive_table)

        try :
            table_repair_dependencies[(schema, hive_table)].append(stagingTask)
        except KeyError:
            table_repair_dependencies[(schema, hive_table)] = [stagingTask]
            
            
    # table repair tasks
    for (schema, object_name) in table_repair_dependencies.keys() :
        repairHiveTask = PythonOperator(dag = dag, task_id="REPAIR_HIVE_%s.%s" %  (schema,object_name),
                     python_callable=repair_hive_table,
                     op_kwargs={'hive_cli_conn_id' : 'query_hive',
                     'schema' : schema,
                     'table' : object_name})
        repairHiveTask << table_repair_dependencies[(schema, object_name)]
        
    return dag

    
        
# dynamically generate all dags
for entry in os.listdir('/home/myuser/jsonconfigs/si/job'):
    file_list = glob.glob(os.path.join('/home/myuser/jsonconfigs/si/job', entry, 'file_list.json'))
    if file_list :    
        runstream = entry
        dag_id = 'qsr_%s' % runstream
        globals()[dag_id] = create_load_dag(dag_id, runstream)
{noformat}
 

*after encountering this issue i ran*: airflow clear mycooldag

below was the output....as you can see the REPAIR_HIVE task was never successful so I don't know how the dag can be overall 'success' state!!!

 

 

<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-01 00:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-02 00:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 00:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 05:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 00:00:00+00:00 [failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 17:00:00+00:00 [failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 22:00:00+00:00 [failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 00:00:00+00:00 [upstream_failed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 07:00:00+00:00 [removed]>
<TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>


> DAG is marked as 'success' even if a task has been 'removed'!
> -------------------------------------------------------------
>
>                 Key: AIRFLOW-4355
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4355
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, DagRun, scheduler
>    Affects Versions: 1.10.3
>            Reporter: t oo
>            Priority: Blocker
>         Attachments: dag_success_even_if_task_removed.png, treeview.png
>
>
> note: all my dags are purely externally triggered
> *Issue:* Dag has 5 parallel tasks that ran successfully and 1 final task that somehow got 'removed' state (prior dag runs had 'failed' state) and never ran successfully but still the DAG is showing success!
>  
> *Command ran* (note that previous commands like airflow trigger_dag -e 20190412 qsr_coremytbl were run before and failed for valid reason (ie python task failing) ):
> airflow trigger_dag -e 20190412T08:00 qsr_coremytbl --conf '\{"hourstr":"08"}'
>  
> *some logs on prior instance of airflow (ec2 was autohealed):*
> [2019-04-18 08:29:40,678] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:40,678] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [None]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed.
>  [2019-04-18 08:29:43,582] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:43,582] {__init__.py:4906} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
>  [2019-04-18 08:29:43,618] \{jobs.py:1787} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [scheduled]> in ORM
>  [2019-04-18 08:29:43,676] \{logging_mixin.py:95} INFO - [2019-04-18 08:29:43,676] {__init__.py:4897} WARNING - Failed to get task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [scheduled]>' for dag '<DAG: qsr_coremytbl>'. Marking it as removed.
>  
> *some logs on newer ec2:*
> [myuser@host logs]$ grep -i hive -R * | sed 's#[0-9]#x#g' | sort | uniq -c | grep -v 'airflow-webserver-access.log'
>  2 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl log xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl'), ('format', u'json')]
>  1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl rendered xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 audit/airflow-audit.log:xxxx-xx-xx xx:xx:xx.xxxxxx xxxx qsr_coremytbl REPAIR_HIVE_schemeh.mytbl task xxxx-xx-xx xx:xx:xx.xxxxxx rsawyerx [('execution_date', u'xxxx-xx-xxTxx:xx:xx+xx:xx'), ('task_id', u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [scheduled]> in ORM
>  71 scheduler/latest/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
>  1 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{jobs.py:xxxx} INFO - Creating / updating <TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [scheduled]> in ORM
>  71 scheduler/xxxx-xx-xx/qsr_dag_generation.py.log:[xxxx-xx-xx xx:xx:xx,xxx] \{logging_mixin.py:xx} INFO - [xxxx-xx-xx xx:xx:xx,xxx] {__init__.py:xxxx} INFO - Restoring task '<TaskInstance: qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl xxxx-xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG '<DAG: qsr_coremytbl>'
>  
> mysql> *select * from task_instance where task_id like '%REP%';#*
>  +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
> |task_id|dag_id|execution_date|start_date|end_date|duration|state|try_number|hostname|unixname|job_id|pool|queue|priority_weight|operator|queued_dttm|pid|max_tries|executor_config|
> +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
> |REPAIR_HIVE_mytbl|qsr_coremytbl|2019-04-13 00:00:00.000000|2019-04-17 20:29:19.639059|2019-04-17 20:29:33.700252|14.0612|failed|1|ip-ec2|myuser|305|NULL|default|1|PythonOperator|2019-04-17 20:29:13.604354|899|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-01 00:00:00.000000|2019-04-17 21:30:11.439127|2019-04-17 21:30:11.439142|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-02 00:00:00.000000|2019-04-17 21:46:34.163607|2019-04-17 21:46:34.163627|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 00:00:00.000000|2019-04-17 21:50:48.541224|2019-04-17 21:50:48.541239|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-01-03 05:00:00.000000|2019-04-17 22:00:24.286685|2019-04-17 22:00:24.286709|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 00:00:00.000000|2019-04-17 21:26:08.621737|2019-04-17 21:26:22.686882|14.0651|failed|1|ip-ec2|myuser|316|NULL|default|1|PythonOperator|2019-04-17 21:26:03.083885|29638|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 17:00:00.000000|2019-04-17 22:44:52.900860|2019-04-17 22:45:04.403179|11.5023|failed|1|ip-ec2|myuser|348|NULL|default|1|PythonOperator|2019-04-17 22:44:47.895363|10815|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-11 22:00:00.000000|2019-04-17 22:37:28.409799|2019-04-17 22:37:41.448494|13.0387|failed|1|ip-ec2|myuser|342|NULL|default|1|PythonOperator|2019-04-17 22:37:23.449554|28697|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 00:00:00.000000|2019-04-17 21:02:36.365150|2019-04-17 21:02:36.365165|NULL|upstream_failed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 07:00:00.000000|2019-04-18 08:24:59.552695|2019-04-18 08:24:59.552710|NULL|removed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> |REPAIR_HIVE_schemeh.mytbl|qsr_coremytbl|2019-04-12 08:00:00.000000|NULL|NULL|NULL|removed|0| |myuser|NULL|NULL|default|1|NULL|NULL|NULL|0|▒}q .|
> +------------------------------+-----------------+++--------------------------------------------------------++--------------------------------------++-----------------------------++------------------------------------------------------++--------------++--------------------------++--------------------------------------------++------------------+-----------------
>  11 rows in set (0.00 sec)
>  
>  
> *Task Instance Details*
>  Dependencies Blocking Task From Getting Scheduled
>  Dependency Reason
>  Task Instance State Task is in the 'removed' state which is not a valid state for execution. The task must be cleared in order to be run.
>  Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'success'.
>  Attribute: python_callable
> def repair_hive_table(hive_cli_conn_id, schema, table, drop_partitions_first=False):
> conn = BaseHook.get_connection(hive_cli_conn_id)
>  ssl_options = conn.extra_dejson.get('ssl-options')
>  jdbc_url = "jdbc:hive2://\{conn.host}:\{conn.port}/\{conn.schema};\{ssl_options}".format(**locals())
> sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
>  hive_command = ' '.join([
>  '/home/myuser/spark_home/bin/beeline',
>  '-u', '"%s"' % jdbc_url,
>  '-n', conn.login,
>  '-w', '/home/myuser/spark_home/hivepw',
>  '-e', '"%s"' % sqlstmt ])
>  # note - do not log the command which contains truststore and hive user passwords
>  logging.info("Executing following SQL statement: %s" % sqlstmt)
> process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
>  (output, error) = process.communicate()
>  logging.info(output)
>  if process.returncode != 0:
>  logging.error('Hive process returned code %d' % process.returncode)
>  raise Exception('Hive process returned code %d' % process.returncode)
>  Task Instance Attributes
>  Attribute Value
>  dag_id qsr_coremytbl
>  duration None
>  end_date None
>  execution_date 2019-04-12T08:00:00+00:00
>  executor_config {}
>  generate_command <function generate_command at 0x7f6526f97ed8>
>  hostname 
>  is_premature False
>  job_id None
>  key ('qsr_coremytbl', u'REPAIR_HIVE_schemeh.mytbl', <Pendulum [2019-04-12T08:00:00+00:00]>, 1)
>  log <logging.Logger object at 0x7f65267a2d50>
>  log_filepath /home/myuser/airflow/logs/qsr_coremytbl/REPAIR_HIVE_schemeh.mytbl/2019-04-12T08:00:00+00:00.log
>  log_url [https://domain/admin/airflow/log?dag_id=qsr_coremytbl&task_id=REPAIR_HIVE_schemeh.mytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00]
>  logger <logging.Logger object at 0x7f65267a2d50>
>  mark_success_url [https://domain/admin/airflow/success?task_id=REPAIR_HIVE_schemeh.mytbl&dag_id=qsr_coremytbl&execution_date=2019-04-12T08%3A00%3A00%2B00%3A00&upstream=false&downstream=false]
>  max_tries 0
>  metadata MetaData(bind=None)
>  next_try_number 1
>  operator None
>  pid None
>  pool None
>  previous_ti None
>  priority_weight 1
>  queue default
>  queued_dttm None
>  raw False
>  run_as_user None
>  start_date None
>  state removed
>  task <Task(PythonOperator): REPAIR_HIVE_schemeh.mytbl>
>  task_id REPAIR_HIVE_schemeh.mytbl
>  test_mode False
>  try_number 1
>  unixname myuser
>  Task Attributes
>  Attribute Value
>  dag <DAG: qsr_coremytbl>
>  dag_id qsr_coremytbl
>  depends_on_past False
>  deps set([<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>])
>  downstream_list []
>  downstream_task_ids set([])
>  email None
>  email_on_failure True
>  email_on_retry True
>  end_date None
>  execution_timeout None
>  executor_config {}
>  inlets []
>  lineage_data None
>  log <logging.Logger object at 0x7f65167edad0>
>  logger <logging.Logger object at 0x7f65167edad0>
>  max_retry_delay None
>  on_failure_callback None
>  on_retry_callback None
>  on_success_callback None
>  op_args []
>  op_kwargs \{'table': u'mytbl', 'hive_cli_conn_id': 'query_hive', 'schema': u'schemeh'}
>  outlets []
>  owner airflow
>  params {}
>  pool None
>  priority_weight 1
>  priority_weight_total 1
>  provide_context False
>  queue default
>  resources {'disk':
> {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}
> , 'gpus': \{'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': \{'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': \{'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
>  retries 0
>  retry_delay 0:05:00
>  retry_exponential_backoff False
>  run_as_user None
>  schedule_interval None
>  shallow_copy_attrs ('python_callable', 'op_kwargs')
>  sla None
>  start_date 2017-06-01T00:00:00+00:00
>  task_concurrency None
>  task_id REPAIR_HIVE_schemeh.mytbl
>  task_type PythonOperator
>  template_ext []
>  template_fields ('templates_dict', 'op_args', 'op_kwargs')
>  templates_dict None
>  trigger_rule all_success
>  ui_color #ffefeb
>  ui_fgcolor #000
>  upstream_list [<Task(SparkSubmitOperator): coremytbl_FR__H>, <Task(SparkSubmitOperator): coremytbl_BE__H>, <Task(SparkSubmitOperator): coremytbl_NL__H>, <Task(SparkSubmitOperator): coremytbl_DE__H>, <Task(SparkSubmitOperator): coremytbl_DAGNAME__H>]
>  upstream_task_ids set([u'coremytbl_FR__H', u'coremytbl_BE__H', u'coremytbl_NL__H', u'coremytbl_DE__H', u'coremytbl_DAGNAME__H'])
>  wait_for_downstream False
>  weight_rule downstream
>  
> DAG code:
> {noformat}
>  
> import datetime as dt
> import glob
> import json
> import logging
> import os
> import subprocess
> #import urllib
> from airflow import DAG
> from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.hooks.base_hook import BaseHook
> # Globals
> DATALAKE_S3ROOT = subprocess.check_output(". /etc/pipeline/profile; dig -t TXT +short qsrs3.$pipelineBranch.sss.$networkDomainName | tail -1 | tr -d '\"' | sed -e s#^s3://##", shell=True).strip()
> default_args = {
>     'owner': 'airflow',
>     'start_date': dt.datetime(2017, 6, 1),
>     'retries': 0,
>     'retry_delay': dt.timedelta(minutes=5),
> }
> def repair_hive_table(hive_cli_conn_id, schema, table, drop_partitions_first=False):
>     
>     conn = BaseHook.get_connection(hive_cli_conn_id)
>     ssl_options = conn.extra_dejson.get('ssl-options')
>     jdbc_url = "jdbc:hive2://{conn.host}:{conn.port}/{conn.schema};{ssl_options}".format(**locals())
>     
>     sqlstmt = 'MSCK REPAIR TABLE %s.%s;' % (schema, table)
>     hive_command = ' '.join([
>     '/home/myuser/spark_home/bin/beeline',
>     '-u', '"%s"' % jdbc_url,
>     '-n', conn.login,
>     '-w', '/home/myuser/spark_home/hivepw',
>     '-e', '"%s"' % sqlstmt ])
>     # note - do not log the command which contains truststore and hive user passwords
>     logging.info("Executing following SQL statement: %s" % sqlstmt)
>     
>     process = subprocess.Popen(hive_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
>     (output, error) = process.communicate()
>     logging.info(output)
>     if process.returncode != 0:
>         logging.error('Hive process returned code %d' % process.returncode)
>         raise Exception('Hive process returned code %d' % process.returncode)
>         
>         
>     
> def create_spark_submit_operator(dag, pool, runstream, iz_strip_name, table, filename):
>     task_id = "%s_%s" % (runstream, iz_strip_name)
>     #need to handle validation and import
>     filename = iz_strip_name + '_input.yaml'
>     file_path = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 'jar/input', filename)
>     return SparkSubmitOperator(
>             dag = dag, # need to tell airflow that this task belongs to the dag we defined above
>             task_id = task_id,
>             pool = pool,
>             #params={"lob": lob}, # the sql file above have a template in it for a 'lob' paramater - this is how we pass it in
>             #bash_command='echo "Data Load task 1 {{ params.lob }} here"'
>             conn_id='process',
>             java_class='com.cimp.jar.jar',
>             application='s3a://%s/jar.jar' % DATALAKE_S3ROOT,
>             #total_executor_cores='16',
>             executor_cores='2',
>             executor_memory='8g',
>             driver_memory='2g',
>             conf={"spark.sql.parquet.writeLegacyFormat" : "true"
>             ,"spark.executor.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }}"
>             ,"spark.driver.extraJavaOptions" : "-Dinput_wildcard={{ ds_nodash }}{{ '*'+dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }} -Ddatestr={{ ds_nodash }} -Dhourstr={{ dag_run.conf['hourstr'] if dag_run.conf['hourstr'] else '' }}"
>             },
>             #num_executors='2',
>             name="%s_{{ ds_nodash }}" % task_id,
>             verbose=True,
>             #dont know how will work (where do the prior exports get made) or if commandline way can be used
>             #env_vars={"input_wildcard": os.environ['input_wildcard'],"datestr": os.environ['datestr'],"hourstr": os.environ['hourstr']},
>             application_args=[
>             "--config", "%s" % file_path
>             ],
>         )
> def create_load_dag(dag_id, runstream):
>     dag = DAG(dag_id, # give the dag a name 
>            schedule_interval=None, # define how often you want it to run - you can pass cron expressions here
>            default_args=default_args # pass the default args defined above or you can override them here if you want this dag to behave a little different
>          )
>     
>     file_list = os.path.join('/home/myuser/jsonconfigs/si/job', runstream, 'file_list.json')
>     process = subprocess.Popen(["cat", file_list], stdout=subprocess.PIPE)
>     decoded_data = json.load(process.stdout)
>     
>     table_repair_dependencies = {} # (schema, table) -> [load tasks]
>     for file in decoded_data['files']:
>         srcfile = file['name']
>         iz_strip_name = file['iz_strip_name']
>         
>         schema = file['Schema']
>         hive_table = file['hive_table']
>         # One staging task
>         stagingTask = create_spark_submit_operator(dag, None, runstream, iz_strip_name, srcfile, "QSRLOAD")
>         stagingTask.doc_md = """\
>         QSR jar LOAD for %s to %s.%s
>         """ % (srcfile,schema,hive_table)
>         try :
>             table_repair_dependencies[(schema, hive_table)].append(stagingTask)
>         except KeyError:
>             table_repair_dependencies[(schema, hive_table)] = [stagingTask]
>             
>             
>     # table repair tasks
>     for (schema, object_name) in table_repair_dependencies.keys() :
>         repairHiveTask = PythonOperator(dag = dag, task_id="REPAIR_HIVE_%s.%s" %  (schema,object_name),
>                      python_callable=repair_hive_table,
>                      op_kwargs={'hive_cli_conn_id' : 'query_hive',
>                      'schema' : schema,
>                      'table' : object_name})
>         repairHiveTask << table_repair_dependencies[(schema, object_name)]
>         
>     return dag
>     
>         
> # dynamically generate all dags
> for entry in os.listdir('/home/myuser/jsonconfigs/si/job'):
>     file_list = glob.glob(os.path.join('/home/myuser/jsonconfigs/si/job', entry, 'file_list.json'))
>     if file_list :    
>         runstream = entry
>         dag_id = 'qsr_%s' % runstream
>         globals()[dag_id] = create_load_dag(dag_id, runstream)
> {noformat}
>  
> *after encountering this issue i ran*: airflow clear mycooldag
> below was the output....as you can see the REPAIR_HIVE task was never successful so I don't know how the dag can be overall 'success' state!!!
>  
>  
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-01 00:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-02 00:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 00:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-01-03 05:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 00:00:00+00:00 [failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 17:00:00+00:00 [failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-11 22:00:00+00:00 [failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 00:00:00+00:00 [upstream_failed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 07:00:00+00:00 [removed]>
> <TaskInstance: mycooldag.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)