You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Roster (Jira)" <ji...@apache.org> on 2020/02/06 12:58:00 UTC

[jira] [Commented] (AIRFLOW-1190) SSH Connection still running, inspite of killing tasks

    [ https://issues.apache.org/jira/browse/AIRFLOW-1190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031561#comment-17031561 ] 

Roster commented on AIRFLOW-1190:
---------------------------------

This is actually a pain point of many operators. I wonder if this must be fixed per operator or it's possible to find a generic way to handle it.

> SSH Connection still running, inspite of killing tasks
> ------------------------------------------------------
>
>                 Key: AIRFLOW-1190
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1190
>             Project: Apache Airflow
>          Issue Type: Task
>          Components: aws, DAG, DagRun
>    Affects Versions: 1.7.0
>         Environment: Airflow is on the AWS EC2 and scripts are launched on the EMR
>            Reporter: Chetan J
>            Priority: Trivial
>              Labels: newbie
>   Original Estimate: 720h
>  Remaining Estimate: 720h
>
> Hello,
> I have an SSH connection created in my dag. When i use "Clear" option to stop a running task, the task stops but the SSH connection is still executing scripts on the remote server
> def execute_on_emr(cmd):
>     f = open(file,'r')
>     s = f.read()
>     keyfile = StringIO.StringIO(s)
>     mykey = paramiko.RSAKey.from_private_key(keyfile)
>     sshcon   = paramiko.SSHClient()
>     sshcon.set_missing_host_key_policy(paramiko.AutoAddPolicy())
>     sshcon.connect(IP, username=username, pkey=mykey)
>     stdin, stdout, stderr = sshcon.exec_command(cmd)
>     logger.info("stdout ------>"+str(stdout.readlines()))
>     logger.info("Error--------->"+str(stderr.readlines()))
>     if (stdout.channel.recv_exit_status())!= 0:
>         logger.info("Error Return code not Zero:"+ str(stdout.channel.recv_exit_status()))
>         sys.exit(1)
> task = PythonOperator(
>     task_id='XXX',
>     python_callable=execute_on_emr,
>     op_kwargs={'cmd': 'spark-submit /home/hadoop/xxx.py'},
>     dag=dag)
> Regards,
> Chetan J



--
This message was sent by Atlassian Jira
(v8.3.4#803005)