You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/10 08:15:03 UTC

[1/9] incubator-airflow git commit: [AIRFLOW-731] Fix period bug for NamedHivePartitionSensor

Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 0611f5c58 -> ba490d27d


[AIRFLOW-731] Fix period bug for NamedHivePartitionSensor

Fix a bug in partition name parsing for the
operator.

Closes #1973 from artwr/artwr-
bugfix_for_named_partition_sensor_and_periods


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/43bf89da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/43bf89da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/43bf89da

Branch: refs/heads/v1-8-test
Commit: 43bf89da7bf6700fda9fdf3f64032a79e5fa76b4
Parents: 19ed900
Author: Arthur Wiedmer <ar...@gmail.com>
Authored: Sun Jan 8 14:47:16 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Jan 8 14:47:26 2017 +0100

----------------------------------------------------------------------
 airflow/operators/sensors.py     |  5 +-
 tests/operators/hive_operator.py | 96 ++++++++++++++++++++++-------------
 2 files changed, 64 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bf89da/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index da01483..f5dd148 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -270,7 +270,7 @@ class NamedHivePartitionSensor(BaseSensorOperator):
             self,
             partition_names,
             metastore_conn_id='metastore_default',
-            poke_interval=60*3,
+            poke_interval=60 * 3,
             *args,
             **kwargs):
         super(NamedHivePartitionSensor, self).__init__(
@@ -283,9 +283,10 @@ class NamedHivePartitionSensor(BaseSensorOperator):
         self.partition_names = partition_names
         self.next_poke_idx = 0
 
+    @classmethod
     def parse_partition_name(self, partition):
         try:
-            schema, table_partition = partition.split('.')
+            schema, table_partition = partition.split('.', 1)
             table, partition = table_partition.split('/', 1)
             return schema, table, partition
         except ValueError as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bf89da/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 9f90999..fec5e69 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -21,7 +21,7 @@ import mock
 import nose
 import six
 
-from airflow import DAG, configuration, operators, utils
+from airflow import DAG, configuration, operators
 configuration.load_test_config()
 
 
@@ -67,10 +67,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             hook = HiveServer2Hook()
             hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv")
 
-        def connect_mock(host, port, auth_mechanism, kerberos_service_name, user, database):
+        def connect_mock(self, host, port,
+                         auth_mechanism, kerberos_service_name,
+                         user, database):
             self.assertEqual(database, self.nondefault_schema)
 
-        @patch('HiveServer2Hook.connect', return_value="foo")
+        @mock.patch('HiveServer2Hook.connect', return_value="foo")
         def test_select_conn_with_schema(self, connect_mock):
             from airflow.hooks.hive_hooks import HiveServer2Hook
 
@@ -94,15 +96,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             schema = "notdefault"
             hook = HiveServer2Hook()
             cursor_mock = MagicMock(
-                __enter__ = cursor_mock,
-                __exit__ = None,
-                execute = None,
-                fetchall = [],
+                __enter__=cursor_mock,
+                __exit__=None,
+                execute=None,
+                fetchall=[],
             )
             get_conn_mock = MagicMock(
-                __enter__ = get_conn_mock,
-                __exit__ = None,
-                cursor = cursor_mock,
+                __enter__=get_conn_mock,
+                __exit__=None,
+                cursor=cursor_mock,
             )
             hook.get_conn = get_conn_mock
 
@@ -112,7 +114,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             # Verify
             get_conn_mock.assert_called_with(self.nondefault_schema)
 
-        @patch('HiveServer2Hook.get_results', return_value={data:[]})
+        @mock.patch('HiveServer2Hook.get_results', return_value={'data': []})
         def test_get_records_with_schema(self, get_results_mock):
             from airflow.hooks.hive_hooks import HiveServer2Hook
 
@@ -124,12 +126,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             hook.get_records(sql, self.nondefault_schema)
 
             # Verify
-            assert connect_mock.called
-            (args, kwargs) = connect_mock.call_args_list[0]
+            assert self.connect_mock.called
+            (args, kwargs) = self.connect_mock.call_args_list[0]
             assert args[0] == sql
             assert kwargs['schema'] == self.nondefault_schema
 
-        @patch('HiveServer2Hook.get_results', return_value={data:[]})
+        @mock.patch('HiveServer2Hook.get_results', return_value={'data': []})
         def test_get_pandas_df_with_schema(self, get_results_mock):
             from airflow.hooks.hive_hooks import HiveServer2Hook
 
@@ -138,11 +140,11 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             hook = HiveServer2Hook()
 
             # Run
-            hook.get_pandas_df(sql, schema)
+            hook.get_pandas_df(sql, self.nondefault_schema)
 
             # Verify
-            assert connect_mock.called
-            (args, kwargs) = connect_mock.call_args_list[0]
+            assert self.connect_mock.called
+            (args, kwargs) = self.connect_mock.call_args_list[0]
             assert args[0] == sql
             assert kwargs['schema'] == self.nondefault_schema
 
@@ -172,7 +174,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             import airflow.operators.hive_operator
             t = operators.hive_operator.HiveOperator(
                 task_id='basic_hql', hql=self.hql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_queues(self):
             import airflow.operators.hive_operator
@@ -181,8 +184,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 mapred_queue='default', mapred_queue_priority='HIGH',
                 mapred_job_name='airflow.test_hive_queues',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_dryrun(self):
             import airflow.operators.hive_operator
@@ -195,7 +198,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             t = operators.hive_operator.HiveOperator(
                 task_id='beeline_hql', hive_cli_conn_id='beeline_default',
                 hql=self.hql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_presto(self):
             sql = """
@@ -204,7 +208,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             import airflow.operators.presto_check_operator
             t = operators.presto_check_operator.PrestoCheckOperator(
                 task_id='presto_check', sql=sql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_presto_to_mysql(self):
             import airflow.operators.presto_to_mysql
@@ -218,14 +223,16 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 mysql_table='test_static_babynames',
                 mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hdfs_sensor(self):
             t = operators.sensors.HdfsSensor(
                 task_id='hdfs_sensor_check',
                 filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_webhdfs_sensor(self):
             t = operators.sensors.WebHdfsSensor(
@@ -233,7 +240,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
                 timeout=120,
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_sql_sensor(self):
             t = operators.sensors.SqlSensor(
@@ -241,7 +249,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 conn_id='presto_default',
                 sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;",
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_stats(self):
             import airflow.operators.hive_stats_operator
@@ -250,14 +259,18 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 table="airflow.static_babynames_partitioned",
                 partition={'ds': DEFAULT_DATE_DS},
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_named_hive_partition_sensor(self):
             t = operators.sensors.NamedHivePartitionSensor(
                 task_id='hive_partition_check',
-                partition_names=["airflow.static_babynames_partitioned/ds={{ds}}"],
+                partition_names=[
+                    "airflow.static_babynames_partitioned/ds={{ds}}"
+                ],
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_named_hive_partition_sensor_succeeds_on_multiple_partitions(self):
             t = operators.sensors.NamedHivePartitionSensor(
@@ -267,7 +280,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                     "airflow.static_babynames_partitioned/ds={{ds}}"
                 ],
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
+
+        def test_named_hive_partition_sensor_parses_partitions_with_periods(self):
+            t = operators.sensors.NamedHivePartitionSensor.parse_partition_name(
+                partition="schema.table/part1=this.can.be.an.issue/part2=ok")
+            self.assertEqual(t[0], "schema")
+            self.assertEqual(t[1], "table")
+            self.assertEqual(t[2], "part1=this.can.be.an.issue/part2=this_should_be_ok")
 
         @nose.tools.raises(airflow.exceptions.AirflowSensorTimeout)
         def test_named_hive_partition_sensor_times_out_on_nonexistent_partition(self):
@@ -280,14 +301,16 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 poke_interval=0.1,
                 timeout=1,
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_partition_sensor(self):
             t = operators.sensors.HivePartitionSensor(
                 task_id='hive_partition_check',
                 table='airflow.static_babynames_partitioned',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_metastore_sql_sensor(self):
             t = operators.sensors.MetastorePartitionSensor(
@@ -295,7 +318,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 table='airflow.static_babynames_partitioned',
                 partition_name='ds={}'.format(DEFAULT_DATE_DS),
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive2samba(self):
             import airflow.operators.hive_to_samba_operator
@@ -305,7 +329,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 hql="SELECT * FROM airflow.static_babynames LIMIT 10000",
                 destination_filepath='test_airflow.csv',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)
 
         def test_hive_to_mysql(self):
             import airflow.operators.hive_to_mysql
@@ -325,4 +350,5 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 ],
                 dag=self.dag)
             t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                  ignore_ti_state=True)


[9/9] incubator-airflow git commit: Merge branch 'master' into v1-8-test

Posted by bo...@apache.org.
Merge branch 'master' into v1-8-test


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ba490d27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ba490d27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ba490d27

Branch: refs/heads/v1-8-test
Commit: ba490d27daa841369a02b8db33937f4397bdbee2
Parents: 0611f5c 2d54d8a
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jan 10 09:14:09 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 10 09:14:09 2017 +0100

----------------------------------------------------------------------
 UPDATING.md                                    |   8 +
 airflow/contrib/hooks/bigquery_hook.py         |  27 +++
 airflow/contrib/operators/dataproc_operator.py | 253 ++++++++++++++++++++
 airflow/contrib/sensors/bigquery_sensor.py     |  69 ++++++
 airflow/models.py                              |   2 +
 airflow/operators/sensors.py                   |   5 +-
 airflow/www/app.py                             |   6 +-
 tests/operators/hive_operator.py               |  96 +++++---
 8 files changed, 426 insertions(+), 40 deletions(-)
----------------------------------------------------------------------



[5/9] incubator-airflow git commit: [AIRFLOW-738] Commit deleted xcom items before insert

Posted by bo...@apache.org.
[AIRFLOW-738] Commit deleted xcom items before insert

A delete insert sequence within one transaction can lead
to a deadlocked transaction with Mariadb / MySQL.

The deletes, in case they affected no rows, all get a shared lock
(mode IX) on the end-of-table gap. Once the insert is executed,
the shared lock is still held by all threads,
and the insert intention waits for the release of this shared lock.

The solution is to not do the following in parallel:

1. Delete the rows you want to insert, when the rows aren't there.
2. Insert the rows

In this case the risk of not executing the delete and insert
is relatively low, as it was the users intention to run the
task. In case it fails in between the two transactions
the task can be tried.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e18d67de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e18d67de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e18d67de

Branch: refs/heads/v1-8-test
Commit: e18d67dec4774946a35f7c34953bdfd7138595bf
Parents: 19ed900
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon Jan 9 22:04:35 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Jan 9 22:10:42 2017 +0100

----------------------------------------------------------------------
 airflow/models.py | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e18d67de/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f6f7968..1a0919a 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3515,6 +3515,8 @@ class XCom(Base):
             cls.task_id == task_id,
             cls.dag_id == dag_id).delete()
 
+        session.commit()
+
         # insert new XCom
         session.add(XCom(
             key=key,


[7/9] incubator-airflow git commit: Update upgrade documentation for Google Cloud

Posted by bo...@apache.org.
Update upgrade documentation for Google Cloud

Closes #1979 from alexvanboxel/pr/doc_gcloud


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7e691d3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7e691d3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7e691d3f

Branch: refs/heads/v1-8-test
Commit: 7e691d3f6040e975f68d6d7ce5d368ad84477dc1
Parents: ffbe728
Author: Alex Van Boxel <al...@vanboxel.be>
Authored: Tue Jan 10 09:03:37 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 10 09:03:44 2017 +0100

----------------------------------------------------------------------
 UPDATING.md | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e691d3f/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 917215e..e23fd57 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -17,6 +17,14 @@ Previously, new DAGs would be scheduled immediately. To retain the old behavior,
 dags_are_paused_at_creation = False
 ```
 
+#### Google Cloud Operator and Hook alignment
+
+All Google Cloud Operators and Hooks are aligned and use the same client library. Now you have a single connection type for all kinds of Google Cloud Operators.
+
+If you experience problems connecting with your operator make sure you set the connection type "Google Cloud Platform".
+
+Also the old P12 key file type is not supported anymore and only the new JSON key files are supported as a service account.
+
 ### Deprecated Features
 These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer supported and will be removed entirely in Airflow 2.0
 


[8/9] incubator-airflow git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow

Posted by bo...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d54d8ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d54d8ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d54d8ab

Branch: refs/heads/v1-8-test
Commit: 2d54d8abd38060d65f8ed5c0eb621c4c6c90060b
Parents: bb88378 7e691d3
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jan 10 09:05:55 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 10 09:05:55 2017 +0100

----------------------------------------------------------------------
 UPDATING.md | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------



[2/9] incubator-airflow git commit: [AIRFLOW-741] Log to debug instead of info for app.py

Posted by bo...@apache.org.
[AIRFLOW-741] Log to debug instead of info for app.py

Closes #1977 from bolkedebruin/AIRFLOW-741


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e010cb29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e010cb29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e010cb29

Branch: refs/heads/v1-8-test
Commit: e010cb29ba4785832b58bd72be6c00e6abf24d16
Parents: 43bf89d
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon Jan 9 10:33:45 2017 +0100
Committer: Alex Van Boxel <al...@vanboxel.be>
Committed: Mon Jan 9 10:33:45 2017 +0100

----------------------------------------------------------------------
 airflow/www/app.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e010cb29/airflow/www/app.py
----------------------------------------------------------------------
diff --git a/airflow/www/app.py b/airflow/www/app.py
index c2c180a..800a31c 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -117,13 +117,13 @@ def create_app(config=None, testing=False):
             from airflow.plugins_manager import (
                 admin_views, flask_blueprints, menu_links)
             for v in admin_views:
-                logging.info('Adding view ' + v.name)
+                logging.debug('Adding view ' + v.name)
                 admin.add_view(v)
             for bp in flask_blueprints:
-                logging.info('Adding blueprint ' + bp.name)
+                logging.debug('Adding blueprint ' + bp.name)
                 app.register_blueprint(bp)
             for ml in sorted(menu_links, key=lambda x: x.name):
-                logging.info('Adding menu link ' + ml.name)
+                logging.debug('Adding menu link ' + ml.name)
                 admin.add_link(ml)
 
         integrate_plugins()


[6/9] incubator-airflow git commit: Merge branch 'AIRFLOW-738'

Posted by bo...@apache.org.
Merge branch 'AIRFLOW-738'


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bb883785
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bb883785
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bb883785

Branch: refs/heads/v1-8-test
Commit: bb883785077f10784255107cbf8c6d5f7b33de15
Parents: ffbe728 e18d67d
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jan 10 09:02:41 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 10 09:02:41 2017 +0100

----------------------------------------------------------------------
 airflow/models.py | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------



[4/9] incubator-airflow git commit: [AIRFLOW-729] Add Google Cloud Dataproc cluster creation operator

Posted by bo...@apache.org.
[AIRFLOW-729] Add Google Cloud Dataproc cluster creation operator

The operator checks if there is already a cluster
running with the provided name in the provided
project.
If so, the operator finishes successfully.
Otherwise, the operator issues a rest API call to
initiate
the cluster creation and waits until the creation
is successful before exiting.

Closes #1971 from
bodschut/feature/dataproc_operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ffbe7282
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ffbe7282
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ffbe7282

Branch: refs/heads/v1-8-test
Commit: ffbe7282dcff5d5dd1c23ab0eff27dab2bd457f6
Parents: 617ba74
Author: Bob De Schutter <de...@gmail.com>
Authored: Mon Jan 9 21:49:06 2017 +0100
Committer: Alex Van Boxel <al...@vanboxel.be>
Committed: Mon Jan 9 21:49:06 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py | 253 ++++++++++++++++++++
 1 file changed, 253 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ffbe7282/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index a3df381..9cf2bbe 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -12,9 +12,262 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+import logging
+import time
+
 from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
+from googleapiclient.errors import HttpError
+
+
+class DataprocClusterCreateOperator(BaseOperator):
+    """
+    Create a new cluster on Google Cloud Dataproc. The operator will wait until the
+    creation is successful or an error occurs in the creation process.
+
+    The parameters allow to configure the cluster. Please refer to
+
+    https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
+
+    for a detailed explanation on the different parameters. Most of the configuration
+    parameters detailed in the link are available as a parameter to this operator.
+    """
+
+    template_fields = ['cluster_name',]
+
+    @apply_defaults
+    def __init__(self,
+                 cluster_name,
+                 project_id,
+                 num_workers,
+                 zone,
+                 storage_bucket=None,
+                 init_actions_uris=None,
+                 metadata=None,
+                 properties=None,
+                 master_machine_type='n1-standard-4',
+                 master_disk_size=500,
+                 worker_machine_type='n1-standard-4',
+                 worker_disk_size=500,
+                 num_preemptible_workers=0,
+                 labels=None,
+                 region='global',
+                 google_cloud_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 *args,
+                 **kwargs):
+        """
+        Create a new DataprocClusterCreateOperator.
+
+        For more info on the creation of a cluster through the API, have a look at:
+
+        https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
+
+        :param cluster_name: The name of the cluster to create
+        :type cluster_name: string
+        :param project_id: The ID of the google cloud project in which
+            to create the cluster
+        :type project_id: string
+        :param num_workers: The # of workers to spin up
+        :type num_workers: int
+        :param storage_bucket: The storage bucket to use, setting to None lets dataproc
+            generate a custom one for you
+        :type storage_bucket: string
+        :param init_actions_uris: List of GCS uri's containing
+            dataproc initialization scripts
+        :type init_actions_uris: list[string]
+        :param metadata: dict of key-value google compute engine metadata entries
+            to add to all instances
+        :type metadata: dict
+        :param properties: dict of properties to set on
+            config files (e.g. spark-defaults.conf), see
+            https://cloud.google.com/dataproc/docs/reference/rest/v1/ \
+            projects.regions.clusters#SoftwareConfig
+        :type properties: dict
+        :param master_machine_type: Compute engine machine type to use for the master node
+        :type master_machine_type: string
+        :param master_disk_size: Disk size for the master node
+        :type int
+        :param worker_machine_type:Compute engine machine type to use for the worker nodes
+        :type worker_machine_type: string
+        :param worker_disk_size: Disk size for the worker nodes
+        :type worker_disk_size: int
+        :param num_preemptible_workers: The # of preemptible worker nodes to spin up
+        :type num_preemptible_workers: int
+        :param labels: dict of labels to add to the cluster
+        :type labels: dict
+        :param zone: The zone where the cluster will be located
+        :type zone: string
+        :param region: leave as 'global', might become relevant in the future
+        :param google_cloud_conn_id: The connection id to use when connecting to dataproc
+        :type google_cloud_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs)
+        self.google_cloud_conn_id = google_cloud_conn_id
+        self.delegate_to = delegate_to
+        self.cluster_name = cluster_name
+        self.project_id = project_id
+        self.num_workers = num_workers
+        self.num_preemptible_workers = num_preemptible_workers
+        self.storage_bucket = storage_bucket
+        self.init_actions_uris = init_actions_uris
+        self.metadata = metadata
+        self.properties = properties
+        self.master_machine_type = master_machine_type
+        self.master_disk_size = master_disk_size
+        self.worker_machine_type = worker_machine_type
+        self.worker_disk_size = worker_disk_size
+        self.labels = labels
+        self.zone = zone
+        self.region = region
+
+    def _get_cluster_list_for_project(self, service):
+        result = service.projects().regions().clusters().list(
+            projectId=self.project_id,
+            region=self.region
+        ).execute()
+        return result.get('clusters', [])
+
+    def _get_cluster(self, service):
+        cluster_list = self._get_cluster_list_for_project(service)
+        cluster = [c for c in cluster_list if c['clusterName'] == self.cluster_name]
+        if cluster:
+            return cluster[0]
+        return None
+
+    def _get_cluster_state(self, service):
+        cluster = self._get_cluster(service)
+        if 'status' in cluster:
+            return cluster['status']['state']
+        else:
+            return None
+
+    def _cluster_ready(self, state, service):
+        if state == 'RUNNING':
+            return True
+        if state == 'ERROR':
+            cluster = self._get_cluster(service)
+            try:
+                error_details = cluster['status']['details']
+            except KeyError:
+                error_details = 'Unknown error in cluster creation, ' \
+                                'check Google Cloud console for details.'
+            raise Exception(error_details)
+        return False
+
+    def _wait_for_done(self, service):
+        while True:
+            state = self._get_cluster_state(service)
+            if state is None:
+                logging.info("No state for cluster '%s'", self.cluster_name)
+                time.sleep(15)
+            else:
+                logging.info("State for cluster '%s' is %s", self.cluster_name, state)
+                if self._cluster_ready(state, service):
+                    logging.info("Cluster '%s' successfully created",
+                                 self.cluster_name)
+                    return
+                time.sleep(15)
+
+    def execute(self, context):
+        hook = DataProcHook(
+            gcp_conn_id=self.google_cloud_conn_id,
+            delegate_to=self.delegate_to
+        )
+        service = hook.get_conn()
+
+        if self._get_cluster(service):
+            logging.info('Cluster {} already exists... Checking status...'.format(
+                            self.cluster_name
+                        ))
+            self._wait_for_done(service)
+            return True
+
+        zone_uri = \
+            'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
+                self.project_id, self.zone
+            )
+        master_type_uri = \
+            "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+                self.project_id, self.zone, self.master_machine_type
+            )
+        worker_type_uri = \
+            "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+                self.project_id, self.zone, self.worker_machine_type
+            )
+        cluster_data = {
+            'projectId': self.project_id,
+            'clusterName': self.cluster_name,
+            'config': {
+                'gceClusterConfig': {
+                    'zoneUri': zone_uri
+                },
+                'masterConfig': {
+                    'numInstances': 1,
+                    'machineTypeUri': master_type_uri,
+                    'diskConfig': {
+                        'bootDiskSizeGb': self.master_disk_size
+                    }
+                },
+                'workerConfig': {
+                    'numInstances': self.num_workers,
+                    'machineTypeUri': worker_type_uri,
+                    'diskConfig': {
+                        'bootDiskSizeGb': self.worker_disk_size
+                    }
+                },
+                'secondaryWorkerConfig': {},
+                'softwareConfig': {}
+            }
+        }
+        if self.num_preemptible_workers > 0:
+            cluster_data['config']['secondaryWorkerConfig'] = {
+                'numInstances': self.num_preemptible_workers,
+                'machineTypeUri': worker_type_uri,
+                'diskConfig': {
+                    'bootDiskSizeGb': self.worker_disk_size
+                },
+                'isPreemptible': True
+            }
+        if self.labels:
+            cluster_data['labels'] = self.labels
+        if self.storage_bucket:
+            cluster_data['config']['configBucket'] = self.storage_bucket
+        if self.metadata:
+            cluster_data['config']['gceClusterConfig']['metadata'] = self.metadata
+        if self.properties:
+            cluster_data['config']['softwareConfig']['properties'] = self.properties
+        if self.init_actions_uris:
+            init_actions_dict = [
+                {'executableFile': uri} for uri in self.init_actions_uris
+            ]
+            cluster_data['config']['initializationActions'] = init_actions_dict
+
+        try:
+            service.projects().regions().clusters().create(
+                projectId=self.project_id,
+                region=self.region,
+                body=cluster_data
+            ).execute()
+        except HttpError as e:
+            # probably two cluster start commands at the same time
+            time.sleep(10)
+            if self._get_cluster(service):
+                logging.info('Cluster {} already exists... Checking status...'.format(
+                             self.cluster_name
+                             ))
+                self._wait_for_done(service)
+                return True
+            else:
+                raise e
+
+        self._wait_for_done(service)
 
 
 class DataProcPigOperator(BaseOperator):


[3/9] incubator-airflow git commit: [AIRFLOW-728] Add Google BigQuery table sensor

Posted by bo...@apache.org.
[AIRFLOW-728] Add Google BigQuery table sensor

Design a sensor that checks whether a certain
table is present in bigquery. The sensor will
accept the google cloud project id, bigquery
dataset id and bigquery table id to check as
parameters.

Internally, it will use the bigquery hook to check
for the existence of the table.
Therefore a 'table_exists' method will be added to
the existing Bigquery hook.

Closes #1970 from bodschut/feature/bq_sensor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/617ba741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/617ba741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/617ba741

Branch: refs/heads/v1-8-test
Commit: 617ba741205ddea8461fc287267fc9c371ace2de
Parents: e010cb2
Author: Bob De Schutter <de...@gmail.com>
Authored: Mon Jan 9 21:46:16 2017 +0100
Committer: Alex Van Boxel <al...@vanboxel.be>
Committed: Mon Jan 9 21:46:16 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py     | 27 ++++++++++
 airflow/contrib/sensors/bigquery_sensor.py | 69 +++++++++++++++++++++++++
 2 files changed, 96 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/617ba741/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index d796565..53ca123 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -22,6 +22,7 @@ import logging
 import time
 
 from apiclient.discovery import build, HttpError
+from googleapiclient import errors
 from builtins import range
 from pandas.io.gbq import GbqConnector, \
     _parse_data as gbq_parse_data, \
@@ -100,6 +101,32 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook):
         else:
             return gbq_parse_data(schema, [])
 
+    def table_exists(self, project_id, dataset_id, table_id):
+        """
+        Checks for the existence of a table in Google BigQuery.
+
+        :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook
+        must provide access to the specified project.
+        :type project_id: string
+        :param dataset_id: The name of the dataset in which to look for the table.
+            storage bucket.
+        :type dataset_id: string
+        :param table_id: The name of the table to check the existence of.
+        :type table_id: string
+        """
+        service = self.get_service()
+        try:
+            service.tables().get(
+                projectId=project_id,
+                datasetId=dataset_id,
+                tableId=table_id
+            ).execute()
+            return True
+        except errors.HttpError as e:
+            if e.resp['status'] == '404':
+                return False
+            raise
+
 
 class BigQueryPandasConnector(GbqConnector):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/617ba741/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
new file mode 100644
index 0000000..8a8ca62
--- /dev/null
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.contrib.hooks.bigquery_hook import BigQueryHook
+from airflow.utils.decorators import apply_defaults
+
+
+class BigQueryTableSensor(BaseSensorOperator):
+    """
+    Checks for the existence of a table in Google Bigquery.
+    """
+    template_fields = ('project_id', 'dataset_id', 'table_id',)
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            project_id,
+            dataset_id,
+            table_id,
+            bigquery_conn_id='bigquery_default_conn',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new BigQueryTableSensor.
+
+        :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook
+        must provide access to the specified project.
+        :type project_id: string
+        :param dataset_id: The name of the dataset in which to look for the table.
+            storage bucket.
+        :type dataset_id: string
+        :param table_id: The name of the table to check the existence of.
+        :type table_id: string
+        :param bigquery_conn_id: The connection ID to use when connecting to Google BigQuery.
+        :type bigquery_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have domain-wide delegation enabled.
+        :type delegate_to: string
+        """
+        super(BigQueryTableSensor, self).__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.table_id = table_id
+        self.bigquery_conn_id = bigquery_conn_id
+        self.delegate_to = delegate_to
+
+    def poke(self, context):
+        table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
+        logging.info('Sensor checks existence of table: %s', table_uri)
+        hook = BigQueryHook(
+            bigquery_conn_id=self.bigquery_conn_id,
+            delegate_to=self.delegate_to)
+        return hook.table_exists(self.project_id, self.dataset_id, self.table_id)