You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/10/19 18:51:51 UTC
incubator-airflow git commit: [AIRFLOW-1726] Add copy_expert psycopg2
method to PostgresHook
Repository: incubator-airflow
Updated Branches:
refs/heads/master 2f107d8a3 -> 6372770be
[AIRFLOW-1726] Add copy_expert psycopg2 method to PostgresHook
Executes SQL using psycopg2 copy_expert method
Necessary to execute COPY command without access to a superuser
Closes #2698 from andyxhadji/AIRFLOW-1726
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6372770b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6372770b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6372770b
Branch: refs/heads/master
Commit: 6372770be6aab67654cede81d6b027a838077a8a
Parents: 2f107d8
Author: Andy Hadjigeorgiou <ah...@columbia.edu>
Authored: Thu Oct 19 20:50:33 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Oct 19 20:50:52 2017 +0200
----------------------------------------------------------------------
airflow/hooks/postgres_hook.py | 11 +++++++
tests/hooks/test_postgres_hook.py | 54 ++++++++++++++++++++++++++++++++++
2 files changed, 65 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6372770b/airflow/hooks/postgres_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py
index e47f8e3..81e10d7 100644
--- a/airflow/hooks/postgres_hook.py
+++ b/airflow/hooks/postgres_hook.py
@@ -14,6 +14,7 @@
import psycopg2
import psycopg2.extensions
+from contextlib import closing
from airflow.hooks.dbapi_hook import DbApiHook
@@ -53,6 +54,16 @@ class PostgresHook(DbApiHook):
psycopg2_conn = psycopg2.connect(**conn_args)
return psycopg2_conn
+ def copy_expert(self, sql, filename, open=open):
+ '''
+ Executes SQL using psycopg2 copy_expert method
+ Necessary to execute COPY command without access to a superuser
+ '''
+ f = open(filename, 'w')
+ with closing(self.get_conn()) as conn:
+ with closing(conn.cursor()) as cur:
+ cur.copy_expert(sql, f)
+
@staticmethod
def _serialize_cell(cell, conn):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6372770b/tests/hooks/test_postgres_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_postgres_hook.py b/tests/hooks/test_postgres_hook.py
new file mode 100644
index 0000000..41bb13e
--- /dev/null
+++ b/tests/hooks/test_postgres_hook.py
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import mock
+import unittest
+
+from airflow.hooks.postgres_hook import PostgresHook
+
+
+class TestPostgresHook(unittest.TestCase):
+
+ def setUp(self):
+ super(TestPostgresHook, self).setUp()
+
+ self.cur = mock.MagicMock()
+ self.conn = conn = mock.MagicMock()
+ self.conn.cursor.return_value = self.cur
+
+ class UnitTestPostgresHook(PostgresHook):
+ conn_name_attr = 'test_conn_id'
+
+ def get_conn(self):
+ return conn
+
+ self.db_hook = UnitTestPostgresHook()
+
+ def test_copy_expert(self):
+ m = mock.mock_open(read_data='{"some": "json"}')
+ with mock.patch('airflow.hooks.postgres_hook.open', m, create=True) as m:
+ statement = "SQL"
+ filename = "filename"
+
+ self.cur.fetchall.return_value = None
+ f = m(filename, 'w')
+ def test_open(filename, mode):
+ return f
+
+ self.assertEqual(None, self.db_hook.copy_expert(statement, filename, open=test_open))
+
+ self.conn.close.assert_called_once()
+ self.cur.close.assert_called_once()
+ self.cur.copy_expert.assert_called_once_with(statement, f)