You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/05/10 08:35:03 UTC
incubator-airflow git commit: [AIRFLOW-2441] Fix bugs in
HiveCliHook.load_df
Repository: incubator-airflow
Updated Branches:
refs/heads/master f7c33afea -> 74027c9a6
[AIRFLOW-2441] Fix bugs in HiveCliHook.load_df
This PR fixes HiveCliHook.load_df to:
1. encode delimiter with the specified encoding
before passing it to pandas.DataFrame.to_csv
so as not to fail
2. flush output file by pandas.DataFrame.to_csv
before executing LOAD DATA statement
3. remove header and row index from output file
by pandas.DataFrame.to_csv so as to read it
as expected via Hive
Closes #3334 from sekikn/AIRFLOW-2441
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/74027c9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/74027c9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/74027c9a
Branch: refs/heads/master
Commit: 74027c9a6ba5f54a7b6392f6dd79d5b8a8782d7b
Parents: f7c33af
Author: Kengo Seki <se...@apache.org>
Authored: Thu May 10 10:34:58 2018 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu May 10 10:34:58 2018 +0200
----------------------------------------------------------------------
airflow/hooks/hive_hooks.py | 7 ++++++-
tests/hooks/test_hive_hook.py | 27 +++++++++++++++++++++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/74027c9a/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 0b7b056..6c9a907 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -336,7 +336,12 @@ class HiveCliHook(BaseHook):
if field_dict is None and (create or recreate):
field_dict = _infer_field_types_from_df(df)
- df.to_csv(f, sep=delimiter, **pandas_kwargs)
+ df.to_csv(path_or_buf=f,
+ sep=delimiter.encode(encoding),
+ header=False,
+ index=False,
+ **pandas_kwargs)
+ f.flush()
return self.load_file(filepath=f.name,
table=table,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/74027c9a/tests/hooks/test_hive_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py
index e16bb38..9eda9e3 100644
--- a/tests/hooks/test_hive_hook.py
+++ b/tests/hooks/test_hive_hook.py
@@ -19,6 +19,7 @@
#
import datetime
+import pandas as pd
import random
import mock
@@ -105,6 +106,32 @@ class TestHiveCliHook(unittest.TestCase):
)
mock_run_cli.assert_called_with(query)
+ @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file')
+ @mock.patch('pandas.DataFrame.to_csv')
+ def test_load_df(self, mock_to_csv, mock_load_file):
+ df = pd.DataFrame({"c": ["foo", "bar", "baz"]})
+ table = "t"
+ delimiter = ","
+ encoding = "utf-8"
+
+ hook = HiveCliHook()
+ hook.load_df(df=df,
+ table=table,
+ delimiter=delimiter,
+ encoding=encoding)
+
+ mock_to_csv.assert_called_once()
+ kwargs = mock_to_csv.call_args[1]
+ self.assertEqual(kwargs["header"], False)
+ self.assertEqual(kwargs["index"], False)
+ self.assertEqual(kwargs["sep"], delimiter.encode(encoding))
+
+ mock_load_file.assert_called_once()
+ kwargs = mock_load_file.call_args[1]
+ self.assertEqual(kwargs["delimiter"], delimiter)
+ self.assertEqual(kwargs["field_dict"], {"c": u"STRING"})
+ self.assertEqual(kwargs["table"], table)
+
class TestHiveMetastoreHook(HiveEnvironmentTest):
VALID_FILTER_MAP = {'key2': 'value2'}