You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/08 13:47:55 UTC
incubator-airflow git commit: [AIRFLOW-731] Fix period bug for
NamedHivePartitionSensor
Repository: incubator-airflow
Updated Branches:
refs/heads/master 19ed9001b -> 43bf89da7
[AIRFLOW-731] Fix period bug for NamedHivePartitionSensor
Fix a bug in partition name parsing for the
operator.
Closes #1973 from artwr/artwr-
bugfix_for_named_partition_sensor_and_periods
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/43bf89da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/43bf89da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/43bf89da
Branch: refs/heads/master
Commit: 43bf89da7bf6700fda9fdf3f64032a79e5fa76b4
Parents: 19ed900
Author: Arthur Wiedmer <ar...@gmail.com>
Authored: Sun Jan 8 14:47:16 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Jan 8 14:47:26 2017 +0100
----------------------------------------------------------------------
airflow/operators/sensors.py | 5 +-
tests/operators/hive_operator.py | 96 ++++++++++++++++++++++-------------
2 files changed, 64 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bf89da/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index da01483..f5dd148 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -270,7 +270,7 @@ class NamedHivePartitionSensor(BaseSensorOperator):
self,
partition_names,
metastore_conn_id='metastore_default',
- poke_interval=60*3,
+ poke_interval=60 * 3,
*args,
**kwargs):
super(NamedHivePartitionSensor, self).__init__(
@@ -283,9 +283,10 @@ class NamedHivePartitionSensor(BaseSensorOperator):
self.partition_names = partition_names
self.next_poke_idx = 0
+ @classmethod
def parse_partition_name(self, partition):
try:
- schema, table_partition = partition.split('.')
+ schema, table_partition = partition.split('.', 1)
table, partition = table_partition.split('/', 1)
return schema, table, partition
except ValueError as e:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bf89da/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 9f90999..fec5e69 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -21,7 +21,7 @@ import mock
import nose
import six
-from airflow import DAG, configuration, operators, utils
+from airflow import DAG, configuration, operators
configuration.load_test_config()
@@ -67,10 +67,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
hook = HiveServer2Hook()
hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv")
- def connect_mock(host, port, auth_mechanism, kerberos_service_name, user, database):
+ def connect_mock(self, host, port,
+ auth_mechanism, kerberos_service_name,
+ user, database):
self.assertEqual(database, self.nondefault_schema)
- @patch('HiveServer2Hook.connect', return_value="foo")
+ @mock.patch('HiveServer2Hook.connect', return_value="foo")
def test_select_conn_with_schema(self, connect_mock):
from airflow.hooks.hive_hooks import HiveServer2Hook
@@ -94,15 +96,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
schema = "notdefault"
hook = HiveServer2Hook()
cursor_mock = MagicMock(
- __enter__ = cursor_mock,
- __exit__ = None,
- execute = None,
- fetchall = [],
+ __enter__=cursor_mock,
+ __exit__=None,
+ execute=None,
+ fetchall=[],
)
get_conn_mock = MagicMock(
- __enter__ = get_conn_mock,
- __exit__ = None,
- cursor = cursor_mock,
+ __enter__=get_conn_mock,
+ __exit__=None,
+ cursor=cursor_mock,
)
hook.get_conn = get_conn_mock
@@ -112,7 +114,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
# Verify
get_conn_mock.assert_called_with(self.nondefault_schema)
- @patch('HiveServer2Hook.get_results', return_value={data:[]})
+ @mock.patch('HiveServer2Hook.get_results', return_value={'data': []})
def test_get_records_with_schema(self, get_results_mock):
from airflow.hooks.hive_hooks import HiveServer2Hook
@@ -124,12 +126,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
hook.get_records(sql, self.nondefault_schema)
# Verify
- assert connect_mock.called
- (args, kwargs) = connect_mock.call_args_list[0]
+ assert self.connect_mock.called
+ (args, kwargs) = self.connect_mock.call_args_list[0]
assert args[0] == sql
assert kwargs['schema'] == self.nondefault_schema
- @patch('HiveServer2Hook.get_results', return_value={data:[]})
+ @mock.patch('HiveServer2Hook.get_results', return_value={'data': []})
def test_get_pandas_df_with_schema(self, get_results_mock):
from airflow.hooks.hive_hooks import HiveServer2Hook
@@ -138,11 +140,11 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
hook = HiveServer2Hook()
# Run
- hook.get_pandas_df(sql, schema)
+ hook.get_pandas_df(sql, self.nondefault_schema)
# Verify
- assert connect_mock.called
- (args, kwargs) = connect_mock.call_args_list[0]
+ assert self.connect_mock.called
+ (args, kwargs) = self.connect_mock.call_args_list[0]
assert args[0] == sql
assert kwargs['schema'] == self.nondefault_schema
@@ -172,7 +174,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
import airflow.operators.hive_operator
t = operators.hive_operator.HiveOperator(
task_id='basic_hql', hql=self.hql, dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_hive_queues(self):
import airflow.operators.hive_operator
@@ -181,8 +184,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
mapred_queue='default', mapred_queue_priority='HIGH',
mapred_job_name='airflow.test_hive_queues',
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_hive_dryrun(self):
import airflow.operators.hive_operator
@@ -195,7 +198,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
t = operators.hive_operator.HiveOperator(
task_id='beeline_hql', hive_cli_conn_id='beeline_default',
hql=self.hql, dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_presto(self):
sql = """
@@ -204,7 +208,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
import airflow.operators.presto_check_operator
t = operators.presto_check_operator.PrestoCheckOperator(
task_id='presto_check', sql=sql, dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_presto_to_mysql(self):
import airflow.operators.presto_to_mysql
@@ -218,14 +223,16 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
mysql_table='test_static_babynames',
mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_hdfs_sensor(self):
t = operators.sensors.HdfsSensor(
task_id='hdfs_sensor_check',
filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_webhdfs_sensor(self):
t = operators.sensors.WebHdfsSensor(
@@ -233,7 +240,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
timeout=120,
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_sql_sensor(self):
t = operators.sensors.SqlSensor(
@@ -241,7 +249,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
conn_id='presto_default',
sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;",
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_hive_stats(self):
import airflow.operators.hive_stats_operator
@@ -250,14 +259,18 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
table="airflow.static_babynames_partitioned",
partition={'ds': DEFAULT_DATE_DS},
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_named_hive_partition_sensor(self):
t = operators.sensors.NamedHivePartitionSensor(
task_id='hive_partition_check',
- partition_names=["airflow.static_babynames_partitioned/ds={{ds}}"],
+ partition_names=[
+ "airflow.static_babynames_partitioned/ds={{ds}}"
+ ],
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_named_hive_partition_sensor_succeeds_on_multiple_partitions(self):
t = operators.sensors.NamedHivePartitionSensor(
@@ -267,7 +280,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
"airflow.static_babynames_partitioned/ds={{ds}}"
],
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
+
+ def test_named_hive_partition_sensor_parses_partitions_with_periods(self):
+ t = operators.sensors.NamedHivePartitionSensor.parse_partition_name(
+ partition="schema.table/part1=this.can.be.an.issue/part2=ok")
+ self.assertEqual(t[0], "schema")
+ self.assertEqual(t[1], "table")
+ self.assertEqual(t[2], "part1=this.can.be.an.issue/part2=this_should_be_ok")
@nose.tools.raises(airflow.exceptions.AirflowSensorTimeout)
def test_named_hive_partition_sensor_times_out_on_nonexistent_partition(self):
@@ -280,14 +301,16 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
poke_interval=0.1,
timeout=1,
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_hive_partition_sensor(self):
t = operators.sensors.HivePartitionSensor(
task_id='hive_partition_check',
table='airflow.static_babynames_partitioned',
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_hive_metastore_sql_sensor(self):
t = operators.sensors.MetastorePartitionSensor(
@@ -295,7 +318,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
table='airflow.static_babynames_partitioned',
partition_name='ds={}'.format(DEFAULT_DATE_DS),
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_hive2samba(self):
import airflow.operators.hive_to_samba_operator
@@ -305,7 +329,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
hql="SELECT * FROM airflow.static_babynames LIMIT 10000",
destination_filepath='test_airflow.csv',
dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
def test_hive_to_mysql(self):
import airflow.operators.hive_to_mysql
@@ -325,4 +350,5 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
],
dag=self.dag)
t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+ ignore_ti_state=True)