You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/05/10 08:35:03 UTC

incubator-airflow git commit: [AIRFLOW-2441] Fix bugs in HiveCliHook.load_df

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f7c33afea -> 74027c9a6


[AIRFLOW-2441] Fix bugs in HiveCliHook.load_df

This PR fixes HiveCliHook.load_df to:

1. encode delimiter with the specified encoding
   before passing it to pandas.DataFrame.to_csv
   so as not to fail

2. flush output file by pandas.DataFrame.to_csv
   before executing LOAD DATA statement

3. remove header and row index from output file
   by pandas.DataFrame.to_csv so as to read it
   as expected via Hive

Closes #3334 from sekikn/AIRFLOW-2441


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/74027c9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/74027c9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/74027c9a

Branch: refs/heads/master
Commit: 74027c9a6ba5f54a7b6392f6dd79d5b8a8782d7b
Parents: f7c33af
Author: Kengo Seki <se...@apache.org>
Authored: Thu May 10 10:34:58 2018 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu May 10 10:34:58 2018 +0200

----------------------------------------------------------------------
 airflow/hooks/hive_hooks.py   |  7 ++++++-
 tests/hooks/test_hive_hook.py | 27 +++++++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/74027c9a/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 0b7b056..6c9a907 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -336,7 +336,12 @@ class HiveCliHook(BaseHook):
                 if field_dict is None and (create or recreate):
                     field_dict = _infer_field_types_from_df(df)
 
-                df.to_csv(f, sep=delimiter, **pandas_kwargs)
+                df.to_csv(path_or_buf=f,
+                          sep=delimiter.encode(encoding),
+                          header=False,
+                          index=False,
+                          **pandas_kwargs)
+                f.flush()
 
                 return self.load_file(filepath=f.name,
                                       table=table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/74027c9a/tests/hooks/test_hive_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py
index e16bb38..9eda9e3 100644
--- a/tests/hooks/test_hive_hook.py
+++ b/tests/hooks/test_hive_hook.py
@@ -19,6 +19,7 @@
 #
 
 import datetime
+import pandas as pd
 import random
 
 import mock
@@ -105,6 +106,32 @@ class TestHiveCliHook(unittest.TestCase):
         )
         mock_run_cli.assert_called_with(query)
 
+    @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file')
+    @mock.patch('pandas.DataFrame.to_csv')
+    def test_load_df(self, mock_to_csv, mock_load_file):
+        df = pd.DataFrame({"c": ["foo", "bar", "baz"]})
+        table = "t"
+        delimiter = ","
+        encoding = "utf-8"
+
+        hook = HiveCliHook()
+        hook.load_df(df=df,
+                     table=table,
+                     delimiter=delimiter,
+                     encoding=encoding)
+
+        mock_to_csv.assert_called_once()
+        kwargs = mock_to_csv.call_args[1]
+        self.assertEqual(kwargs["header"], False)
+        self.assertEqual(kwargs["index"], False)
+        self.assertEqual(kwargs["sep"], delimiter.encode(encoding))
+
+        mock_load_file.assert_called_once()
+        kwargs = mock_load_file.call_args[1]
+        self.assertEqual(kwargs["delimiter"], delimiter)
+        self.assertEqual(kwargs["field_dict"], {"c": u"STRING"})
+        self.assertEqual(kwargs["table"], table)
+
 
 class TestHiveMetastoreHook(HiveEnvironmentTest):
     VALID_FILTER_MAP = {'key2': 'value2'}