You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/10/19 18:51:51 UTC

incubator-airflow git commit: [AIRFLOW-1726] Add copy_expert psycopg2 method to PostgresHook

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 2f107d8a3 -> 6372770be


[AIRFLOW-1726] Add copy_expert psycopg2 method to PostgresHook

Executes SQL using psycopg2 copy_expert method
Necessary to execute COPY command without access to a superuser

Closes #2698 from andyxhadji/AIRFLOW-1726


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6372770b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6372770b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6372770b

Branch: refs/heads/master
Commit: 6372770be6aab67654cede81d6b027a838077a8a
Parents: 2f107d8
Author: Andy Hadjigeorgiou <ah...@columbia.edu>
Authored: Thu Oct 19 20:50:33 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Oct 19 20:50:52 2017 +0200

----------------------------------------------------------------------
 airflow/hooks/postgres_hook.py    | 11 +++++++
 tests/hooks/test_postgres_hook.py | 54 ++++++++++++++++++++++++++++++++++
 2 files changed, 65 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6372770b/airflow/hooks/postgres_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py
index e47f8e3..81e10d7 100644
--- a/airflow/hooks/postgres_hook.py
+++ b/airflow/hooks/postgres_hook.py
@@ -14,6 +14,7 @@
 
 import psycopg2
 import psycopg2.extensions
+from contextlib import closing
 
 from airflow.hooks.dbapi_hook import DbApiHook
 
@@ -53,6 +54,16 @@ class PostgresHook(DbApiHook):
         psycopg2_conn = psycopg2.connect(**conn_args)
         return psycopg2_conn
 
+    def copy_expert(self, sql, filename, open=open):
+        '''
+        Executes SQL using psycopg2 copy_expert method
+        Necessary to execute COPY command without access to a superuser
+        '''
+        f = open(filename, 'w')
+        with closing(self.get_conn()) as conn:
+            with closing(conn.cursor()) as cur:
+                cur.copy_expert(sql, f)
+
     @staticmethod
     def _serialize_cell(cell, conn):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6372770b/tests/hooks/test_postgres_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_postgres_hook.py b/tests/hooks/test_postgres_hook.py
new file mode 100644
index 0000000..41bb13e
--- /dev/null
+++ b/tests/hooks/test_postgres_hook.py
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import mock
+import unittest
+
+from airflow.hooks.postgres_hook import PostgresHook
+
+
+class TestPostgresHook(unittest.TestCase):
+
+    def setUp(self):
+        super(TestPostgresHook, self).setUp()
+
+        self.cur = mock.MagicMock()
+        self.conn = conn = mock.MagicMock()
+        self.conn.cursor.return_value = self.cur
+
+        class UnitTestPostgresHook(PostgresHook):
+            conn_name_attr = 'test_conn_id'
+
+            def get_conn(self):
+                return conn
+
+        self.db_hook = UnitTestPostgresHook()
+
+    def test_copy_expert(self):
+        m = mock.mock_open(read_data='{"some": "json"}')
+        with mock.patch('airflow.hooks.postgres_hook.open', m, create=True) as m:
+            statement = "SQL"
+            filename = "filename"
+
+            self.cur.fetchall.return_value = None
+            f = m(filename, 'w')
+            def test_open(filename, mode):
+                return f
+
+            self.assertEqual(None, self.db_hook.copy_expert(statement, filename, open=test_open))
+
+            self.conn.close.assert_called_once()
+            self.cur.close.assert_called_once()
+            self.cur.copy_expert.assert_called_once_with(statement, f)