You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (Jira)" <ji...@apache.org> on 2019/12/20 13:42:00 UTC
[jira] [Updated] (AIRFLOW-5730) Enable get_pandas_df on
PinotDbApiHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ash Berlin-Taylor updated AIRFLOW-5730:
---------------------------------------
Fix Version/s: (was: 2.0.0)
> Enable get_pandas_df on PinotDbApiHook
> --------------------------------------
>
> Key: AIRFLOW-5730
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5730
> Project: Apache Airflow
> Issue Type: Improvement
> Components: hooks
> Affects Versions: 1.10.5
> Reporter: Kengo Seki
> Assignee: Kengo Seki
> Priority: Major
> Fix For: 1.10.7
>
>
> Currently, DruidDbApiHook and PinotDbApiHook disable their {{get_pandas_df}} methods by raising {{NotImplementedError}}.
> But they actually work as inherited from DbApiHook, as follows:
> {code}
> $ git diff
> diff --git a/airflow/contrib/hooks/pinot_hook.py b/airflow/contrib/hooks/pinot_hook.py
> index e617f8e9b..0864b3584 100644
> --- a/airflow/contrib/hooks/pinot_hook.py
> +++ b/airflow/contrib/hooks/pinot_hook.py
> @@ -90,8 +90,5 @@ class PinotDbApiHook(DbApiHook):
> def set_autocommit(self, conn, autocommit):
> raise NotImplementedError()
>
> - def get_pandas_df(self, sql, parameters=None):
> - raise NotImplementedError()
> -
> def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
> raise NotImplementedError()
> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
> index c3cd3cd71..e2e20f1ec 100644
> --- a/airflow/hooks/druid_hook.py
> +++ b/airflow/hooks/druid_hook.py
> @@ -158,8 +158,5 @@ class DruidDbApiHook(DbApiHook):
> def set_autocommit(self, conn, autocommit):
> raise NotImplementedError()
>
> - def get_pandas_df(self, sql, parameters=None):
> - raise NotImplementedError()
> -
> def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
> raise NotImplementedError()
> {code}
> {code:title=Druid example}
> $ airflow connections list
> (snip)
> ├────────────────────────────────┼─────────────────────────────┼───────────────────────────┼────────┼────────────────┼──────────────────────┼────────────────────────────────┤
> │ 'druid_broker_default' │ 'druid-broker' │ 'localhost' │ 8082 │ False │ True │ 'gAAAAABdrxvt...M1ideRO8233QG' │
> ╘════════════════════════════════╧═════════════════════════════╧═══════════════════════════╧════════╧════════════════╧══════════════════════╧════════════════════════════════╛
> $ ipython
> (snip)
> In [2]: from airflow.hooks.druid_hook import DruidDbApiHook
> In [3]: DruidDbApiHook().get_pandas_df("SELECT * FROM wikipedia WHERE sum_delta > %(num)d", {"num": 2000})
> [2019-10-23 23:28:18,606] {base_hook.py:89} INFO - Using connection to: id: druid_broker_default. Host: localhost, Port: 8082, Schema: None, Login: None, Password: None, extra: {'schema': 'http', 'endpoint': '/druid/v2/sql'}
> [2019-10-23 23:28:18,607] {druid_hook.py:140} INFO - Get the connection to druid broker on localhost using user None
> Out[3]:
> __time channel cityName comment ... sum_deleted sum_delta sum_metroCode user
> 0 2015-09-12T00:00:00.000Z #en.wikipedia Archiving case from [[Wikipedia:Sockpuppet inv... ... 0 3360 0 Bbb23
> 1 2015-09-12T00:00:00.000Z #ja.wikipedia [[Special:Contributions/119.224.209.170|119.22... ... 0 6853 0 Kkairri
> 2 2015-09-12T01:00:00.000Z #en.wikipedia /* Hong Kong */ ... 0 4500 0 Bertaut
> 3 2015-09-12T01:00:00.000Z #en.wikipedia Archiving 1 discussion(s) from [[User talk:New... ... 0 3599 0 Lowercase sigmabot III
> 4 2015-09-12T01:00:00.000Z #en.wikipedia [[WP:AES|←]]Created page with '{{Infobox wildf... ... 0 13335 0 Orygun
> .. ... ... ... ... ... ... ... ... ...
> 851 2015-09-12T23:00:00.000Z #pt.wikipedia Bem-vindo (usando [[WP:H|Huggle]]) (3.1.16) ... 0 2588 0 Mobyduck
> 852 2015-09-12T23:00:00.000Z #pt.wikipedia adição de informação, renovação de conteúdos e... ... 0 3666 0 Templarius 01
> 853 2015-09-12T23:00:00.000Z #ru.wikipedia [[ВП:←|←]] Новая страница: «{{редактирую|~~~~|... ... 0 6766 0 Dulamas
> 854 2015-09-12T23:00:00.000Z #ru.wikipedia Tver [[ВП:×|отмена]] правки 73302711 участника [[Sp... ... 0 9302 0 94.241.56.71
> 855 2015-09-12T23:00:00.000Z #sr.wikipedia Нова страница: [[Датотека:US Open.svg|десно|20... ... 0 38443 0 Самарџија
> [856 rows x 21 columns]
> {code}
> {code:title=Pinot example}
> $ airflow connections list
> (snip)
> ├────────────────────────────────┼─────────────────────────────┼───────────────────────────┼────────┼────────────────┼──────────────────────┼────────────────────────────────┤
> │ 'pinot_broker_default' │ 'pinot_broker_conn_id' │ 'localhost' │ 8000 │ False │ True │ 'gAAAAABdrxRj...Afd51PZY94nfa' │
> ├────────────────────────────────┼─────────────────────────────┼───────────────────────────┼────────┼────────────────┼──────────────────────┼────────────────────────────────┤
> $ ipython
> (snip)
> In [2]: from airflow.contrib.hooks.pinot_hook import PinotDbApiHook
> In [3]: PinotDbApiHook().get_pandas_df("select sum('runs') from baseballStats where yearID>=%(num)d group by playerName", {"num": 2000})
> [2019-10-23 23:31:06,058] {base_hook.py:89} INFO - Using connection to: id: pinot_broker_default. Host: localhost, Port: 8000, Schema: None, Login: None, Password: None, extra: {'endpoint': '/query', 'schema': 'http'}
> [2019-10-23 23:31:06,059] {pinot_hook.py:48} INFO - Get the connection to pinot broker on localhost
> select sum('runs') from baseballStats where yearID>=2000 group by playerName
> Out[3]:
> playerName sum_runs
> 0 Adrian 1820.00000
> 1 Jose Antonio 1692.00000
> 2 Rafael 1565.00000
> 3 Brian Michael 1500.00000
> 4 Jose Alberto 1426.00000
> 5 Alexander Emmanuel 1426.00000
> 6 Derek Sanderson 1390.00000
> 7 Carlos 1314.00000
> 8 Johnny David 1300.00000
> 9 Ichiro 1261.00000
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)