You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/08 13:47:55 UTC

incubator-airflow git commit: [AIRFLOW-731] Fix period bug for NamedHivePartitionSensor

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 19ed9001b -> 43bf89da7


[AIRFLOW-731] Fix period bug for NamedHivePartitionSensor

Fix a bug in partition name parsing for the
operator.

Closes #1973 from artwr/artwr-
bugfix_for_named_partition_sensor_and_periods


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/43bf89da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/43bf89da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/43bf89da

Branch: refs/heads/master
Commit: 43bf89da7bf6700fda9fdf3f64032a79e5fa76b4
Parents: 19ed900
Author: Arthur Wiedmer <ar...@gmail.com>
Authored: Sun Jan 8 14:47:16 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Jan 8 14:47:26 2017 +0100

----------------------------------------------------------------------
 airflow/operators/sensors.py     |  5 +-
 tests/operators/hive_operator.py | 96 ++++++++++++++++++++++-------------
 2 files changed, 64 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bf89da/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index da01483..f5dd148 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -270,7 +270,7 @@ class NamedHivePartitionSensor(BaseSensorOperator):
             self,
             partition_names,
             metastore_conn_id='metastore_default',
-            poke_interval=60*3,
+            poke_interval=60 * 3,
             *args,
             **kwargs):
         super(NamedHivePartitionSensor, self).__init__(
@@ -283,9 +283,10 @@ class NamedHivePartitionSensor(BaseSensorOperator):
         self.partition_names = partition_names
         self.next_poke_idx = 0
 
+    @classmethod
     def parse_partition_name(self, partition):
         try:
-            schema, table_partition = partition.split('.')
+            schema, table_partition = partition.split('.', 1)
             table, partition = table_partition.split('/', 1)
             return schema, table, partition
         except ValueError as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bf89da/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 9f90999..fec5e69 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -21,7 +21,7 @@ import mock
 import nose
 import six
 
-from airflow import DAG, configuration, operators, utils
+from airflow import DAG, configuration, operators
 configuration.load_test_config()
 
 
@@ -67,10 +67,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             hook = HiveServer2Hook()
             hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv")
 
-        def connect_mock(host, port, auth_mechanism, kerberos_service_name, user, database):
+        def connect_mock(self, host, port,
+                         auth_mechanism, kerberos_service_name,
+                         user, database):
             self.assertEqual(database, self.nondefault_schema)
 
-        @patch('HiveServer2Hook.connect', return_value="foo")
+        @mock.patch('HiveServer2Hook.connect', return_value="foo")
         def test_select_conn_with_schema(self, connect_mock):
             from airflow.hooks.hive_hooks import HiveServer2Hook
 
@@ -94,15 +96,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             schema = "notdefault"
             hook = HiveServer2Hook()
             cursor_mock = MagicMock(
-                __enter__ = cursor_mock,
-                __exit__ = None,
-                execute = None,
-                fetchall = [],
+                __enter__=cursor_mock,
+                __exit__=None,
+                execute=None,
+                fetchall=[],
             )
             get_conn_mock = MagicMock(
-                __enter__ = get_conn_mock,
-                __exit__ = None,
-                cursor = cursor_mock,
+                __enter__=get_conn_mock,
+                __exit__=None,
+                cursor=cursor_mock,
             )
             hook.get_conn = get_conn_mock
 
@@ -112,7 +114,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             # Verify
             get_conn_mock.assert_called_with(self.nondefault_schema)
 
-        @patch('HiveServer2Hook.get_results', return_value={data:[]})
+        @mock.patch('HiveServer2Hook.get_results', return_value={'data': []})
         def test_get_records_with_schema(self, get_results_mock):
             from airflow.hooks.hive_hooks import HiveServer2Hook
 
@@ -124,12 +126,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             hook.get_records(sql, self.nondefault_schema)
 
             # Verify
-            assert connect_mock.called
-            (args, kwargs) = connect_mock.call_args_list[0]
+            assert self.connect_mock.called
+            (args, kwargs) = self.connect_mock.call_args_list[0]
             assert args[0] == sql
             assert kwargs['schema'] == self.nondefault_schema
 
-        @patch('HiveServer2Hook.get_results', return_value={data:[]})
+        @mock.patch('HiveServer2Hook.get_results', return_value={'data': []})
         def test_get_pandas_df_with_schema(self, get_results_mock):
             from airflow.hooks.hive_hooks import HiveServer2Hook
 
@@ -138,11 +140,11 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             hook = HiveServer2Hook()
 
             # Run
-            hook.get_pandas_df(sql, schema)
+            hook.get_pandas_df(sql, self.nondefault_schema)
 
             # Verify
-            assert connect_mock.called
-            (args, kwargs) = connect_mock.call_args_list[0]
+            assert self.connect_mock.called
+            (args, kwargs) = self.connect_mock.call_args_list[0]
             assert args[0] == sql
             assert kwargs['schema'] == self.nondefault_schema
 
@@ -172,7 +174,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             import airflow.operators.hive_operator
             t = operators.hive_operator.HiveOperator(
                 task_id='basic_hql', hql=self.hql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_queues(self):
             import airflow.operators.hive_operator
@@ -181,8 +184,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 mapred_queue='default', mapred_queue_priority='HIGH',
                 mapred_job_name='airflow.test_hive_queues',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_dryrun(self):
             import airflow.operators.hive_operator
@@ -195,7 +198,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             t = operators.hive_operator.HiveOperator(
                 task_id='beeline_hql', hive_cli_conn_id='beeline_default',
                 hql=self.hql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_presto(self):
             sql = """
@@ -204,7 +208,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             import airflow.operators.presto_check_operator
             t = operators.presto_check_operator.PrestoCheckOperator(
                 task_id='presto_check', sql=sql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_presto_to_mysql(self):
             import airflow.operators.presto_to_mysql
@@ -218,14 +223,16 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 mysql_table='test_static_babynames',
                 mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hdfs_sensor(self):
             t = operators.sensors.HdfsSensor(
                 task_id='hdfs_sensor_check',
                 filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_webhdfs_sensor(self):
             t = operators.sensors.WebHdfsSensor(
@@ -233,7 +240,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
                 timeout=120,
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_sql_sensor(self):
             t = operators.sensors.SqlSensor(
@@ -241,7 +249,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 conn_id='presto_default',
                 sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;",
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_stats(self):
             import airflow.operators.hive_stats_operator
@@ -250,14 +259,18 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 table="airflow.static_babynames_partitioned",
                 partition={'ds': DEFAULT_DATE_DS},
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_named_hive_partition_sensor(self):
             t = operators.sensors.NamedHivePartitionSensor(
                 task_id='hive_partition_check',
-                partition_names=["airflow.static_babynames_partitioned/ds={{ds}}"],
+                partition_names=[
+                    "airflow.static_babynames_partitioned/ds={{ds}}"
+                ],
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_named_hive_partition_sensor_succeeds_on_multiple_partitions(self):
             t = operators.sensors.NamedHivePartitionSensor(
@@ -267,7 +280,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                     "airflow.static_babynames_partitioned/ds={{ds}}"
                 ],
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
+
+        def test_named_hive_partition_sensor_parses_partitions_with_periods(self):
+            t = operators.sensors.NamedHivePartitionSensor.parse_partition_name(
+                partition="schema.table/part1=this.can.be.an.issue/part2=ok")
+            self.assertEqual(t[0], "schema")
+            self.assertEqual(t[1], "table")
+            self.assertEqual(t[2], "part1=this.can.be.an.issue/part2=this_should_be_ok")
 
         @nose.tools.raises(airflow.exceptions.AirflowSensorTimeout)
         def test_named_hive_partition_sensor_times_out_on_nonexistent_partition(self):
@@ -280,14 +301,16 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 poke_interval=0.1,
                 timeout=1,
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_partition_sensor(self):
             t = operators.sensors.HivePartitionSensor(
                 task_id='hive_partition_check',
                 table='airflow.static_babynames_partitioned',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_metastore_sql_sensor(self):
             t = operators.sensors.MetastorePartitionSensor(
@@ -295,7 +318,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 table='airflow.static_babynames_partitioned',
                 partition_name='ds={}'.format(DEFAULT_DATE_DS),
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive2samba(self):
             import airflow.operators.hive_to_samba_operator
@@ -305,7 +329,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 hql="SELECT * FROM airflow.static_babynames LIMIT 10000",
                 destination_filepath='test_airflow.csv',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_to_mysql(self):
             import airflow.operators.hive_to_mysql
@@ -325,4 +350,5 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 ],
                 dag=self.dag)
             t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)