You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ma...@apache.org on 2016/06/13 18:54:49 UTC

incubator-airflow git commit: [AIRFLOW-230] [HiveServer2Hook] adding multi statements support

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 901e8f2a9 -> a599167c4


[AIRFLOW-230] [HiveServer2Hook] adding multi statements support

Changing the library from pyhive to impyla broke the behavior where multiple statements, including statements that don't return results were previously supported and aren't anymore. impyla raises an exception if any of the statements doesn't return result.

We have tasks that run multiple statements including DDL and want to run them atomically.

Closes #1583 from mistercrunch/hooks_hive_presto

[AIRFLOW-230] [HiveServer2Hook] adding multi statements support


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a599167c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a599167c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a599167c

Branch: refs/heads/master
Commit: a599167c433246d96bea711d8bfd5710b2c9d3ff
Parents: 901e8f2
Author: Maxime Beauchemin <ma...@apache.org>
Authored: Mon Jun 13 11:54:35 2016 -0700
Committer: Maxime Beauchemin <ma...@apache.org>
Committed: Mon Jun 13 11:54:35 2016 -0700

----------------------------------------------------------------------
 airflow/hooks/hive_hooks.py | 10 +++++++++-
 tests/core.py               |  9 +++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a599167c/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 0b06f49..87cce6a 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -465,6 +465,7 @@ class HiveServer2Hook(BaseHook):
             database=db.schema or 'default')
 
     def get_results(self, hql, schema='default', arraysize=1000):
+        from impala.error import ProgrammingError
         with self.get_conn() as conn:
             if isinstance(hql, basestring):
                 hql = [hql]
@@ -475,7 +476,14 @@ class HiveServer2Hook(BaseHook):
             for statement in hql:
                 with conn.cursor() as cur:
                     cur.execute(statement)
-                    records = cur.fetchall()
+                    records = []
+                    try:
+                        # impala Lib raises when no results are returned
+                        # we're silencing here as some statements in the list
+                        # may be `SET` or DDL
+                        records = cur.fetchall()
+                    except ProgrammingError:
+                        logging.debug("get_results returned no records")
                     if records:
                         results = {
                             'data': records,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a599167c/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index d5f33a1..bbf9e60 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1474,6 +1474,15 @@ if 'HiveOperator' in dir(operators):
             hook = HiveServer2Hook()
             hook.get_records(sql)
 
+        def test_multi_statements(self):
+            from airflow.hooks.hive_hooks import HiveServer2Hook
+            sqls = [
+                "CREATE TABLE IF NOT EXISTS test_multi_statements (i INT)",
+                "DROP TABLE test_multi_statements",
+            ]
+            hook = HiveServer2Hook()
+            hook.get_records(sqls)
+
         def test_get_metastore_databases(self):
             if six.PY2:
                 from airflow.hooks.hive_hooks import HiveMetastoreHook