You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2021/05/04 06:39:52 UTC

[airflow] branch master updated: Feature qubole hook support headers (#15615)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 47a5539  Feature qubole hook support headers (#15615)
47a5539 is described below

commit 47a5539f7b83826b85b189b58b1641798d637369
Author: Itay Levy <le...@gmail.com>
AuthorDate: Mon May 3 23:39:27 2021 -0700

    Feature qubole hook support headers (#15615)
    
    * add support for include_headers in QuboleHook get_results
---
 airflow/providers/qubole/CHANGELOG.rst         |  9 +++++++++
 airflow/providers/qubole/hooks/qubole.py       | 14 ++++++++++++--
 airflow/providers/qubole/hooks/qubole_check.py |  2 +-
 airflow/providers/qubole/operators/qubole.py   | 12 ++++++++++--
 airflow/providers/qubole/provider.yaml         |  1 +
 5 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/qubole/CHANGELOG.rst b/airflow/providers/qubole/CHANGELOG.rst
index eac8d8e..3512f3d 100644
--- a/airflow/providers/qubole/CHANGELOG.rst
+++ b/airflow/providers/qubole/CHANGELOG.rst
@@ -19,6 +19,15 @@
 Changelog
 ---------
 
+1.0.3
+.....
+
+Features
+~~~~~~~~
+
+*  ``Feature add support for include_headers in get_results (#15598)``
+
+
 1.0.2
 .....
 
diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py
index 99e6027..cd23bba 100644
--- a/airflow/providers/qubole/hooks/qubole.py
+++ b/airflow/providers/qubole/hooks/qubole.py
@@ -203,7 +203,15 @@ class QuboleHook(BaseHook):
             self.cmd.cancel()
 
     # pylint: disable=consider-using-with
-    def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True) -> str:
+    def get_results(
+        self,
+        ti=None,
+        fp=None,
+        inline: bool = True,
+        delim=None,
+        fetch: bool = True,
+        include_headers: bool = False,
+    ) -> str:
         """
         Get results (or just s3 locations) of a command from Qubole and save into a file
 
@@ -225,7 +233,9 @@ class QuboleHook(BaseHook):
             cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id)
             self.cmd = self.cls.find(cmd_id)
 
-        self.cmd.get_results(fp, inline, delim, fetch)  # type: ignore[attr-defined]
+        self.cmd.get_results(
+            fp, inline, delim, fetch, arguments=[include_headers]
+        )  # type: ignore[attr-defined]
         fp.flush()
         fp.close()
         return fp.name
diff --git a/airflow/providers/qubole/hooks/qubole_check.py b/airflow/providers/qubole/hooks/qubole_check.py
index 987ea59..c15f47f 100644
--- a/airflow/providers/qubole/hooks/qubole_check.py
+++ b/airflow/providers/qubole/hooks/qubole_check.py
@@ -112,7 +112,7 @@ class QuboleCheckHook(QuboleHook):
             cmd_id = self.cmd.id
             self.log.info("command id: %d", cmd_id)
             query_result_buffer = StringIO()
-            self.cmd.get_results(fp=query_result_buffer, inline=True, delim=COL_DELIM)
+            self.cmd.get_results(fp=query_result_buffer, inline=True, delim=COL_DELIM, arguments=[True])
             query_result = query_result_buffer.getvalue()
             query_result_buffer.close()
             return query_result
diff --git a/airflow/providers/qubole/operators/qubole.py b/airflow/providers/qubole/operators/qubole.py
index b090af6..ce8a6c7 100644
--- a/airflow/providers/qubole/operators/qubole.py
+++ b/airflow/providers/qubole/operators/qubole.py
@@ -250,9 +250,17 @@ class QuboleOperator(BaseOperator):
         else:
             self.get_hook().kill(ti)
 
-    def get_results(self, ti=None, fp=None, inline: bool = True, delim=None, fetch: bool = True) -> str:
+    def get_results(
+        self,
+        ti=None,
+        fp=None,
+        inline: bool = True,
+        delim=None,
+        fetch: bool = True,
+        include_headers: bool = False,
+    ) -> str:
         """get_results from Qubole"""
-        return self.get_hook().get_results(ti, fp, inline, delim, fetch)
+        return self.get_hook().get_results(ti, fp, inline, delim, fetch, include_headers)
 
     def get_log(self, ti) -> None:
         """get_log from Qubole"""
diff --git a/airflow/providers/qubole/provider.yaml b/airflow/providers/qubole/provider.yaml
index bd4cd58..dad3364 100644
--- a/airflow/providers/qubole/provider.yaml
+++ b/airflow/providers/qubole/provider.yaml
@@ -22,6 +22,7 @@ description: |
     `Qubole <https://www.qubole.com/>`__
 
 versions:
+  - 1.0.3
   - 1.0.2
   - 1.0.1
   - 1.0.0