You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Oliver Ricken (Jira)" <ji...@apache.org> on 2020/03/27 12:15:00 UTC

[jira] [Updated] (AIRFLOW-7109) HiveServer2Hook and pyHive do not handle timeout correctly

     [ https://issues.apache.org/jira/browse/AIRFLOW-7109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Oliver Ricken updated AIRFLOW-7109:
-----------------------------------
    Description: 
Dear experts,

using the 1.10.6 vanilla "HiveServer2Hook" from airflow/hooks/hive_hooks in a custom-operator, we observe the improper handling of timeouts configured on task-level using "execution_timeout".

The picture is always the same:
 # The hook is instantiated, the connection opend and the query submitted
 # The task-timeout is reached but the query is completed nonetheless (confirmed by looking into YARN, job is flagged as "finished", i.e. successful)
 # Although the query has been successful (hive-wise), no result is obtained and processed but an obscure codec-error is raised, making the task fail. The codec-error is based on the actual AirflowTimeoutException not being properly propagated.

The persistent codec-error (including its stacktrace) is given below.

We investigated the issue further by slightly modifiying the vanilla version of the HiveServer2Hook and adding some debugging output to various modules.
 * We made the "pyhive.hive.connect"-object a class-variable (instead of having it only exist in the scope of the _get_results-classmethod (vanilla)
 * We added a "kill()"-method that invokes "connection.close()" on said class-variable and call the method upon execution of "on_kill()" in our custom-operator.
 * We added debugging (i.e. logging) output to the airflow/models/taskinstance.py "_run_raw_task"-method and the "airflow/utils/timeout.py" class's "__enter__()" and "__exit__()"-methods.

Our findings, based on the attached log-files (and the modified core-module-files as well as the modified hook, the custom-operator and the DAG) is the following:
 * In order to invoke the "on_kill()"-method of any operator upon timeout of the task,
 the "timeout.py"-class used to execute the "task_copy" from ("taskinstance.py, "_run_raw_task")  needs to exit with an "AirflowTimeoutException" (and nothing else!)
 * The AirflowTimeoutException is produced but not at the point in runtime at which the timeout has actually been reached but only at the end of the pyHive-execution of the query.
 * Upward propagation of the AirflowTimeoutException then fails with the obscure codec-error mentioned earlier and thus no "on_kill()"- and "kill()"-methods are invoked. Nota bena: this would not be necessary as the task does not need actual killing anymore as it converged anyhow.

In out opinion this leads to two aspects to be studied, understood and fixed, potentially in pyHive though.
 * The execution of the pyHive query needs to be killed upon timeout
 * The propagation of the exception needs be performed properly

NB: closing the pyhive.hive.connection-object upon invocation of "on_kill()" does work properly if the task is killed manually, e.g. by setting it to "failed". In this case, "on_kill()" is invoked directly instead of waiting for the AirflowTimeoutException to pop up.

NB: things work perfectly using the "HiverCLIHook" which, under the hood, uses beeline executed from a python subprocess-call which is terminated (killed after 60'') upon invocation of "on_kill" in e.g. the vanilla HiveOperator. The log attached as "vanilla_HiverOperator_task.log" shows how the behaviour of the vanilla HiverOpator performing the same query. In principle, similarly flawless behaviour is expected when using the HiveServer2Hook to submit the query ...

 

Your input on this matter and any suggestions are highly appreciated. The "HiveServer2Hook" is, in our opinion, an irreplacable tool when in need of query-results for postprocessing.

 

Cheers

 Oliver

{{[2020-03-20 14:45:42,982] \{taskinstance.py:1060} ERROR - 'utf-8' codec can't decode byte 0xdb in position 0: invalid continuation byte
 Traceback (most recent call last):
 File "/home/airflow/dags/src/vipa_import/plugins/hooks/dev_hive_hook.py", line 33, in _get_results
 cur.execute(statement)
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/pyhive/hive.py", line 364, in execute
 response = self._connection.client.ExecuteStatement(req)
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py", line 280, in ExecuteStatement
 return self.recv_ExecuteStatement()
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py", line 292, in recv_ExecuteStatement
 (fname, mtype, rseqid) = iprot.readMessageBegin()
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py", line 134, in readMessageBegin
 sz = self.readI32()
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py", line 217, in readI32
 buff = self.trans.readAll(4)
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py", line 60, in readAll
 chunk = self.read(sz - have)
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 166, in read
 self._read_frame()
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 170, in _read_frame
 header = self._trans.readAll(4)
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py", line 60, in readAll
 chunk = self.read(sz - have)
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TSocket.py", line 117, in read
 buff = self.handle.recv(sz)
 File "/home/airflow/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
 raise AirflowTaskTimeout(self.error_message)
 airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 17089}}

  was:
Dear experts,

using the 1.10.6 vanilla "HiveServer2Hook" from airflow/hooks/hive_hooks in a custom-operator, we observe the improper handling of timeouts configured on task-level using "execution_timeout".

The picture is always the same:
 # The hook is instantiated, the connection opend and the query submitted
 # The task-timeout is reached but the query is completed nonetheless (confirmed by looking into YARN, job is flagged as "finished", i.e. successful)
 # Although the query has been successful (hive-wise), no result is obtained and processed but an obscure codec-error is raised, making the task fail. The codec-error is based on the actual AirflowTimeoutException not being properly propagated.

The persistent codec-error (including its stacktrace) is given below.

We investigated the issue further by slightly modifiying the vanilla version of the HiveServer2Hook and adding some debugging output to various modules.
 * We made the "pyhive.hive.connect"-object a class-variable (instead of having it only exist in the scope of the \_get_results-classmethod (vanilla)
 * We added a "kill()"-method that invokes "connection.close()" on said class-variable and call the method upon execution of "on_kill()" in our custom-operator.
 * We added debugging (i.e. logging) output to the airflow/models/taskinstance.py "_run_raw_task"-method and the "airflow/utils/timeout.py" class's "__enter__()" and "__exit__()"-methods.

Our findings, based on the attached log-files (and the modified core-module-files as well as the modified hook, the custom-operator and the DAG) is the following:
 * In order to invoke the "on_kill()"-method of any operator upon timeout of the task, 
the "timeout.py"-class used to execute the "task_copy" from ("taskinstance.py, "_run_raw_task")  needs to exit with an "AirflowTimeoutException" (and nothing else!)
 * The AirflowTimeoutException is produced but not at the point in runtime at which the timeout has actually been reached but only at the end of the pyHive-execution of the query.
 * Upward propagation of the AirflowTimeoutException then fails with the obscure codec-error mentioned earlier and thus no "on_kill()"- and "kill()"-methods are invoked. Nota bena: this would not be necessary as the task does not need actual killing anymore as it converged anyhow.

In out opinion this leads to two aspects to be studied, understood and fixed, potentially in pyHive though.
 * The execution of the pyHive query needs to be killed upon timeout
 * The propagation of the exception needs be performed properly

NB: closing the pyhive.hive.connection-object upon invocation of "on_kill()" does work properly if the task is killed manually, e.g. by setting it to "failed". In this case, "on_kill()" is invoked directly instead of waiting for the AirflowTimeoutException to pop up.

NB: things work perfectly using the "HiverCLIHook" which, under the hood, uses beeline executed from a python subprocess-call which is terminated (killed after 60'') upon invocation of "on_kill" in e.g. the vanilla HiveOperator. The log attached as "vanilla_HiverOperator_task.log" shows how the behaviour of the vanilla HiverOpator performing the same query. In principle, similarly flawless behaviour is expected when using the HiveServer2Hook to submit the query ...

 

Your input on this matter and any suggestions are highly appreciated. The "HiveServer2Hook" is, in our opinion, an irreplacable tool when in need of query-results for postprocessing.

 

Cheers

 

{{[2020-03-20 14:45:42,982] \{taskinstance.py:1060} ERROR - 'utf-8' codec can't decode byte 0xdb in position 0: invalid continuation byte
Traceback (most recent call last):
  File "/home/airflow/dags/src/vipa_import/plugins/hooks/dev_hive_hook.py", line 33, in _get_results
    cur.execute(statement)
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/pyhive/hive.py", line 364, in execute
    response = self._connection.client.ExecuteStatement(req)
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py", line 280, in ExecuteStatement
    return self.recv_ExecuteStatement()
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py", line 292, in recv_ExecuteStatement
    (fname, mtype, rseqid) = iprot.readMessageBegin()
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py", line 134, in readMessageBegin
    sz = self.readI32()
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py", line 217, in readI32
    buff = self.trans.readAll(4)
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py", line 60, in readAll
    chunk = self.read(sz - have)
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 166, in read
    self._read_frame()
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 170, in _read_frame
    header = self._trans.readAll(4)
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py", line 60, in readAll
    chunk = self.read(sz - have)
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TSocket.py", line 117, in read
    buff = self.handle.recv(sz)
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 17089}}

    Environment: CentOS 7, python 3.6.7,  Airflow 1.10.6, HDP 3.1.4

> HiveServer2Hook and pyHive do not handle timeout correctly
> ----------------------------------------------------------
>
>                 Key: AIRFLOW-7109
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-7109
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: hooks, utils
>    Affects Versions: 1.10.6
>         Environment: CentOS 7, python 3.6.7,  Airflow 1.10.6, HDP 3.1.4
>            Reporter: Oliver Ricken
>            Priority: Major
>         Attachments: dag.py, dev_hive_hook.py, dev_hive_operator.py, modified_HiveServer2Hook_task.log, taskinstance.py, timeout.py, vanilla_HiverOperator_task.log
>
>
> Dear experts,
> using the 1.10.6 vanilla "HiveServer2Hook" from airflow/hooks/hive_hooks in a custom-operator, we observe the improper handling of timeouts configured on task-level using "execution_timeout".
> The picture is always the same:
>  # The hook is instantiated, the connection opend and the query submitted
>  # The task-timeout is reached but the query is completed nonetheless (confirmed by looking into YARN, job is flagged as "finished", i.e. successful)
>  # Although the query has been successful (hive-wise), no result is obtained and processed but an obscure codec-error is raised, making the task fail. The codec-error is based on the actual AirflowTimeoutException not being properly propagated.
> The persistent codec-error (including its stacktrace) is given below.
> We investigated the issue further by slightly modifiying the vanilla version of the HiveServer2Hook and adding some debugging output to various modules.
>  * We made the "pyhive.hive.connect"-object a class-variable (instead of having it only exist in the scope of the _get_results-classmethod (vanilla)
>  * We added a "kill()"-method that invokes "connection.close()" on said class-variable and call the method upon execution of "on_kill()" in our custom-operator.
>  * We added debugging (i.e. logging) output to the airflow/models/taskinstance.py "_run_raw_task"-method and the "airflow/utils/timeout.py" class's "__enter__()" and "__exit__()"-methods.
> Our findings, based on the attached log-files (and the modified core-module-files as well as the modified hook, the custom-operator and the DAG) is the following:
>  * In order to invoke the "on_kill()"-method of any operator upon timeout of the task,
>  the "timeout.py"-class used to execute the "task_copy" from ("taskinstance.py, "_run_raw_task")  needs to exit with an "AirflowTimeoutException" (and nothing else!)
>  * The AirflowTimeoutException is produced but not at the point in runtime at which the timeout has actually been reached but only at the end of the pyHive-execution of the query.
>  * Upward propagation of the AirflowTimeoutException then fails with the obscure codec-error mentioned earlier and thus no "on_kill()"- and "kill()"-methods are invoked. Nota bena: this would not be necessary as the task does not need actual killing anymore as it converged anyhow.
> In out opinion this leads to two aspects to be studied, understood and fixed, potentially in pyHive though.
>  * The execution of the pyHive query needs to be killed upon timeout
>  * The propagation of the exception needs be performed properly
> NB: closing the pyhive.hive.connection-object upon invocation of "on_kill()" does work properly if the task is killed manually, e.g. by setting it to "failed". In this case, "on_kill()" is invoked directly instead of waiting for the AirflowTimeoutException to pop up.
> NB: things work perfectly using the "HiverCLIHook" which, under the hood, uses beeline executed from a python subprocess-call which is terminated (killed after 60'') upon invocation of "on_kill" in e.g. the vanilla HiveOperator. The log attached as "vanilla_HiverOperator_task.log" shows how the behaviour of the vanilla HiverOpator performing the same query. In principle, similarly flawless behaviour is expected when using the HiveServer2Hook to submit the query ...
>  
> Your input on this matter and any suggestions are highly appreciated. The "HiveServer2Hook" is, in our opinion, an irreplacable tool when in need of query-results for postprocessing.
>  
> Cheers
>  Oliver
> {{[2020-03-20 14:45:42,982] \{taskinstance.py:1060} ERROR - 'utf-8' codec can't decode byte 0xdb in position 0: invalid continuation byte
>  Traceback (most recent call last):
>  File "/home/airflow/dags/src/vipa_import/plugins/hooks/dev_hive_hook.py", line 33, in _get_results
>  cur.execute(statement)
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/pyhive/hive.py", line 364, in execute
>  response = self._connection.client.ExecuteStatement(req)
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py", line 280, in ExecuteStatement
>  return self.recv_ExecuteStatement()
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py", line 292, in recv_ExecuteStatement
>  (fname, mtype, rseqid) = iprot.readMessageBegin()
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py", line 134, in readMessageBegin
>  sz = self.readI32()
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py", line 217, in readI32
>  buff = self.trans.readAll(4)
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py", line 60, in readAll
>  chunk = self.read(sz - have)
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 166, in read
>  self._read_frame()
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 170, in _read_frame
>  header = self._trans.readAll(4)
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py", line 60, in readAll
>  chunk = self.read(sz - have)
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TSocket.py", line 117, in read
>  buff = self.handle.recv(sz)
>  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
>  raise AirflowTaskTimeout(self.error_message)
>  airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 17089}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)