You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/04 07:01:57 UTC

[01/50] incubator-airflow git commit: [AIRFLOW-2382] Fix wrong description for delimiter [Forced Update!]

Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test 5495d2590 -> 16bae5634 (forced update)


[AIRFLOW-2382] Fix wrong description for delimiter

Fix misleading descriptions for the 'delimiter'
parameter in S3ListOperator and
S3ToGoogleCloudStorageOperator's docstring.

Closes #3270 from sekikn/AIRFLOW-2382


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ae63246a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ae63246a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ae63246a

Branch: refs/heads/v1-10-test
Commit: ae63246a1d580d77c366276300139165e4c984de
Parents: 36193fc
Author: Kengo Seki <se...@apache.org>
Authored: Thu Apr 26 18:45:12 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu Apr 26 18:45:12 2018 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/s3_list_operator.py   | 12 +++++-------
 airflow/contrib/operators/s3_to_gcs_operator.py |  6 ++----
 2 files changed, 7 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae63246a/airflow/contrib/operators/s3_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py
index 1dcbc60..dbb45fe 100644
--- a/airflow/contrib/operators/s3_list_operator.py
+++ b/airflow/contrib/operators/s3_list_operator.py
@@ -24,8 +24,7 @@ from airflow.utils.decorators import apply_defaults
 
 class S3ListOperator(BaseOperator):
     """
-    List all objects from the bucket with the given string prefix and delimiter
-    in name.
+    List all objects from the bucket with the given string prefix in name.
 
     This operator returns a python list with the name of objects which can be
     used by `xcom` in the downstream task.
@@ -35,22 +34,21 @@ class S3ListOperator(BaseOperator):
     :param prefix: Prefix string to filters the objects whose name begin with
         such prefix
     :type prefix: string
-    :param delimiter: The delimiter by which you want to filter the objects.
-        For e.g to lists the CSV files from in a directory in S3 you would use
-        delimiter='.csv'.
+    :param delimiter: the delimiter marks key hierarchy.
     :type delimiter: string
     :param aws_conn_id: The connection ID to use when connecting to S3 storage.
     :type aws_conn_id: string
 
     **Example**:
-        The following operator would list all the CSV files from the S3
+        The following operator would list all the files
+        (excluding subfolders) from the S3
         ``customers/2018/04/`` key in the ``data`` bucket. ::
 
             s3_file = S3ListOperator(
                 task_id='list_3s_files',
                 bucket='data',
                 prefix='customers/2018/04/',
-                delimiter='.csv',
+                delimiter='/',
                 aws_conn_id='aws_customers_conn'
             )
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae63246a/airflow/contrib/operators/s3_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py b/airflow/contrib/operators/s3_to_gcs_operator.py
index d105596..5a2004d 100644
--- a/airflow/contrib/operators/s3_to_gcs_operator.py
+++ b/airflow/contrib/operators/s3_to_gcs_operator.py
@@ -37,12 +37,10 @@ class S3ToGoogleCloudStorageOperator(S3ListOperator):
     :param prefix: Prefix string which filters objects whose name begin with
         such prefix.
     :type prefix: string
-    :param delimiter: The delimiter by which you want to filter the objects on.
-        E.g. to list CSV files from a S3 key you would do the following,
-        `delimiter='.csv'`.
+    :param delimiter: the delimiter marks key hierarchy.
     :type delimiter: string
     :param aws_conn_id: The source S3 connection
-    :type aws_conn_id: str
+    :type aws_conn_id: string
     :param dest_gcs_conn_id: The destination connection ID to use
         when connecting to Google Cloud Storage.
     :type dest_gcs_conn_id: string


[32/50] incubator-airflow git commit: [AIRFLOW-2401] Document the use of variables in Jinja template

Posted by fo...@apache.org.
[AIRFLOW-2401] Document the use of variables in Jinja template

Closes #2847 from moe-nadal-ck/patch-1


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a67c13e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a67c13e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a67c13e4

Branch: refs/heads/v1-10-test
Commit: a67c13e44cb96acacc79896b327bbaaae19707a1
Parents: 8921e7d
Author: Moe Nadal <mo...@creditkarma.com>
Authored: Mon Apr 30 15:05:47 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 15:06:10 2018 -0700

----------------------------------------------------------------------
 docs/concepts.rst | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a67c13e4/docs/concepts.rst
----------------------------------------------------------------------
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 89c25fe..c28b10f 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -427,6 +427,18 @@ The second call assumes ``json`` content and will be deserialized into
 ``bar``. Note that ``Variable`` is a sqlalchemy model and can be used
 as such.
 
+You can use a variable from a jinja template with the syntax : 
+
+.. code:: bash
+
+    echo {{ var.value.<variable_name> }}
+    
+or if you need to deserialize a json object from the variable : 
+
+.. code:: bash
+
+    echo {{ var.json.<variable_name> }}
+    
 
 Branching
 =========


[47/50] incubator-airflow git commit: closes apache/incubator-airflow#2675 *Closed for inactivity.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2675 *Closed for inactivity.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3a28ceb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3a28ceb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3a28ceb5

Branch: refs/heads/v1-10-test
Commit: 3a28ceb5c5c5d48e2cded54a010b6f3ca45ac84e
Parents: 02a88f6
Author: r39132 <si...@yahoo.com>
Authored: Thu May 3 22:52:39 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu May 3 22:52:39 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[34/50] incubator-airflow git commit: [AIRFLOW-XXX] Add Twine Labs as an Airflow user

Posted by fo...@apache.org.
[AIRFLOW-XXX] Add Twine Labs as an Airflow user

Closes #3287 from ivorpeles/twine_airflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4428d1bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4428d1bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4428d1bb

Branch: refs/heads/v1-10-test
Commit: 4428d1bbbf943cc7868c74d13f53cf0e336dfec7
Parents: 2d1b2ae
Author: ivor <iv...@mail.utoronto.ca>
Authored: Tue May 1 22:07:47 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Tue May 1 22:07:51 2018 +0200

----------------------------------------------------------------------
 README.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4428d1bb/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index bd09e9c..86b4a8c 100644
--- a/README.md
+++ b/README.md
@@ -220,6 +220,7 @@ Currently **officially** using Airflow:
 1. [Tictail](https://tictail.com/)
 1. [Tile](https://tile.com/) [[@ranjanmanish](https://github.com/ranjanmanish)]
 1. [Tokopedia](https://www.tokopedia.com/) [@topedmaria](https://github.com/topedmaria)
+1. [Twine Labs](https://www.twinelabs.com/) [[@ivorpeles](https://github.com/ivorpeles)]
 1. [T2 Systems](http://t2systems.com) [[@unclaimedpants](https://github.com/unclaimedpants)]
 1. [Ubisoft](https://www.ubisoft.com/) [[@Walkoss](https://github.com/Walkoss)]
 1. [United Airlines](https://www.united.com/) [[@ilopezfr](https://github.com/ilopezfr)]


[07/50] incubator-airflow git commit: [AIRFLOW-2391] Fix to Flask 0.12.2

Posted by fo...@apache.org.
[AIRFLOW-2391] Fix to Flask 0.12.2

Flask 0.12.3 has issues with Airflow and needs to
be fixed.
Therefore lock the version to 0.12.2.

Closes #3277 from Fokko/fd-fix-master-ci


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3368f425
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3368f425
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3368f425

Branch: refs/heads/v1-10-test
Commit: 3368f4258c2dcfbcdbaf631fa887a742f12720b8
Parents: 67c0099
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Sat Apr 28 20:25:13 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sat Apr 28 20:25:13 2018 +0200

----------------------------------------------------------------------
 setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3368f425/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 742e01b..b5461eb 100644
--- a/setup.py
+++ b/setup.py
@@ -229,7 +229,7 @@ def do_setup():
             'configparser>=3.5.0, <3.6.0',
             'croniter>=0.3.17, <0.4',
             'dill>=0.2.2, <0.3',
-            'flask>=0.12, <0.13',
+            'flask==0.12.2',
             'flask-appbuilder>=1.9.6, <2.0.0',
             'flask-admin==1.4.1',
             'flask-caching>=1.3.3, <1.4.0',


[44/50] incubator-airflow git commit: [AIRFLOW-2411] add dataproc_jars to templated_fields

Posted by fo...@apache.org.
[AIRFLOW-2411] add dataproc_jars to templated_fields

This commit makes it possible to use jinja
templates when passing
JAR file URIs to the DataProc operators that
require JAR files,
specifically the DataProc Hive, Pig, SparkSql,
Spark, Hadoop and
PySpark operators.

Closes #3305 from mchalek/template-dataproc-jars


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c5fa8cd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c5fa8cd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c5fa8cd4

Branch: refs/heads/v1-10-test
Commit: c5fa8cd411ca67889eafd109988d2472ccfbba10
Parents: 6b3f6ce
Author: Kevin McHale <mc...@gmail.com>
Authored: Thu May 3 22:55:10 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Thu May 3 22:55:10 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5fa8cd4/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 56ebb91..ad0aa09 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -481,7 +481,7 @@ class DataProcPigOperator(BaseOperator):
     :param region: The specified region where the dataproc cluster is created.
     :type region: string
     """
-    template_fields = ['query', 'variables', 'job_name', 'cluster_name']
+    template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
     template_ext = ('.pg', '.pig',)
     ui_color = '#0273d4'
 
@@ -561,7 +561,7 @@ class DataProcHiveOperator(BaseOperator):
     :param region: The specified region where the dataproc cluster is created.
     :type region: string
     """
-    template_fields = ['query', 'variables', 'job_name', 'cluster_name']
+    template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
     template_ext = ('.q',)
     ui_color = '#0273d4'
 
@@ -642,7 +642,7 @@ class DataProcSparkSqlOperator(BaseOperator):
     :param region: The specified region where the dataproc cluster is created.
     :type region: string
     """
-    template_fields = ['query', 'variables', 'job_name', 'cluster_name']
+    template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
     template_ext = ('.q',)
     ui_color = '#0273d4'
 
@@ -731,7 +731,7 @@ class DataProcSparkOperator(BaseOperator):
     :type region: string
     """
 
-    template_fields = ['arguments', 'job_name', 'cluster_name']
+    template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
     ui_color = '#0273d4'
 
     @apply_defaults
@@ -821,7 +821,7 @@ class DataProcHadoopOperator(BaseOperator):
     :type region: string
     """
 
-    template_fields = ['arguments', 'job_name', 'cluster_name']
+    template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
     ui_color = '#0273d4'
 
     @apply_defaults
@@ -911,7 +911,7 @@ class DataProcPySparkOperator(BaseOperator):
     :type region: string
     """
 
-    template_fields = ['arguments', 'job_name', 'cluster_name']
+    template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
     ui_color = '#0273d4'
 
     @staticmethod


[12/50] incubator-airflow git commit: [AIRFLOW-1575] Add AWS Kinesis Firehose Hook for inserting batch records

Posted by fo...@apache.org.
[AIRFLOW-1575] Add AWS Kinesis Firehose Hook for inserting batch records

Closes #3275 from sid88in/feature/kinesis_hookv2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d588e94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d588e94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d588e94

Branch: refs/heads/v1-10-test
Commit: 2d588e9433cd9a1a1381cf939f579f7d7e53330f
Parents: e691acc
Author: sid.gupta <si...@glassdoor.com>
Authored: Sat Apr 28 23:11:36 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:11:36 2018 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/aws_firehose_hook.py    | 52 ++++++++++++++++
 tests/contrib/hooks/test_aws_firehose_hook.py | 70 ++++++++++++++++++++++
 2 files changed, 122 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/airflow/contrib/hooks/aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/aws_firehose_hook.py b/airflow/contrib/hooks/aws_firehose_hook.py
new file mode 100644
index 0000000..cf7b2fc
--- /dev/null
+++ b/airflow/contrib/hooks/aws_firehose_hook.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.aws_hook import AwsHook
+
+
+class AwsFirehoseHook(AwsHook):
+    """
+    Interact with AWS Kinesis Firehose.
+    :param delivery_stream: Name of the delivery stream
+    :type delivery_stream: str
+    :param region_name: AWS region name (example: us-east-1)
+    :type region_name: str
+    """
+
+    def __init__(self, delivery_stream, region_name=None, *args, **kwargs):
+        self.delivery_stream = delivery_stream
+        self.region_name = region_name
+        super(AwsFirehoseHook, self).__init__(*args, **kwargs)
+
+    def get_conn(self):
+        """
+        Returns AwsHook connection object.
+        """
+
+        self.conn = self.get_client_type('firehose', self.region_name)
+        return self.conn
+
+    def put_records(self, records):
+        """
+        Write batch records to Kinesis Firehose
+        """
+
+        firehose_conn = self.get_conn()
+
+        response = firehose_conn.put_record_batch(
+            DeliveryStreamName=self.delivery_stream,
+            Records=records
+        )
+
+        return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/tests/contrib/hooks/test_aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_aws_firehose_hook.py b/tests/contrib/hooks/test_aws_firehose_hook.py
new file mode 100644
index 0000000..0a2c809
--- /dev/null
+++ b/tests/contrib/hooks/test_aws_firehose_hook.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import uuid
+
+from airflow.contrib.hooks.aws_firehose_hook import AwsFirehoseHook
+
+try:
+    from moto import mock_kinesis
+except ImportError:
+    mock_kinesis = None
+
+
+class TestAwsFirehoseHook(unittest.TestCase):
+
+    @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present')
+    @mock_kinesis
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = AwsFirehoseHook(aws_conn_id='aws_default',
+                               delivery_stream="test_airflow", region_name="us-east-1")
+        self.assertIsNotNone(hook.get_conn())
+
+    @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present')
+    @mock_kinesis
+    def test_insert_batch_records_kinesis_firehose(self):
+        hook = AwsFirehoseHook(aws_conn_id='aws_default',
+                               delivery_stream="test_airflow", region_name="us-east-1")
+
+        response = hook.get_conn().create_delivery_stream(
+            DeliveryStreamName="test_airflow",
+            S3DestinationConfiguration={
+                'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
+                'BucketARN': 'arn:aws:s3:::kinesis-test',
+                'Prefix': 'airflow/',
+                'BufferingHints': {
+                    'SizeInMBs': 123,
+                    'IntervalInSeconds': 124
+                },
+                'CompressionFormat': 'UNCOMPRESSED',
+            }
+        )
+
+        stream_arn = response['DeliveryStreamARN']
+        self.assertEquals(
+            stream_arn, "arn:aws:firehose:us-east-1:123456789012:deliverystream/test_airflow")
+
+        records = [{"Data": str(uuid.uuid4())}
+                   for _ in range(100)]
+
+        response = hook.put_records(records)
+
+        self.assertEquals(response['FailedPutCount'], 0)
+        self.assertEquals(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+
+if __name__ == '__main__':
+    unittest.main()


[46/50] incubator-airflow git commit: closes apache/incubator-airflow#2962 *Closed for inactivity.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2962 *Closed for inactivity.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/02a88f6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/02a88f6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/02a88f6f

Branch: refs/heads/v1-10-test
Commit: 02a88f6f6ff34b07630efe6157e1fec3bc3bb505
Parents: b02820a
Author: r39132 <si...@yahoo.com>
Authored: Thu May 3 22:51:30 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu May 3 22:51:30 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[30/50] incubator-airflow git commit: [AIRFLOW-2403] Fix License Headers

Posted by fo...@apache.org.
[AIRFLOW-2403] Fix License Headers

Closes #3285 from r39132/fix_license_headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f63a2b1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f63a2b1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f63a2b1a

Branch: refs/heads/v1-10-test
Commit: f63a2b1a1f1c216e22d6c46ecbd56199a2ff398e
Parents: a7e8f48
Author: r39132 <si...@yahoo.com>
Authored: Mon Apr 30 13:12:51 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 13:12:51 2018 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/aws_firehose_hook.py    | 24 ++++++++++++---------
 tests/contrib/hooks/test_aws_firehose_hook.py | 25 ++++++++++++----------
 2 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f63a2b1a/airflow/contrib/hooks/aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/aws_firehose_hook.py b/airflow/contrib/hooks/aws_firehose_hook.py
index cf7b2fc..b273e22 100644
--- a/airflow/contrib/hooks/aws_firehose_hook.py
+++ b/airflow/contrib/hooks/aws_firehose_hook.py
@@ -1,16 +1,20 @@
-# -*- coding: utf-8 -*-
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 from airflow.contrib.hooks.aws_hook import AwsHook
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f63a2b1a/tests/contrib/hooks/test_aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_aws_firehose_hook.py b/tests/contrib/hooks/test_aws_firehose_hook.py
index 0a2c809..f22bcde 100644
--- a/tests/contrib/hooks/test_aws_firehose_hook.py
+++ b/tests/contrib/hooks/test_aws_firehose_hook.py
@@ -1,17 +1,20 @@
-# -*- coding: utf-8 -*-
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import unittest
 import uuid


[25/50] incubator-airflow git commit: [AIRFLOW-2389] Create a pinot db api hook

Posted by fo...@apache.org.
[AIRFLOW-2389] Create a pinot db api hook

Closes #3274 from feng-tao/pinot_db_hook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/700c0f48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/700c0f48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/700c0f48

Branch: refs/heads/v1-10-test
Commit: 700c0f488f5f08772983584b2b635973618548b5
Parents: ae48fce
Author: Tao feng <tf...@lyft.com>
Authored: Mon Apr 30 08:41:43 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Apr 30 08:41:43 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/pinot_hook.py    | 105 ++++++++++++++++++++++++++++
 docs/code.rst                          |   1 +
 setup.py                               |   6 +-
 tests/contrib/hooks/test_pinot_hook.py |  76 ++++++++++++++++++++
 4 files changed, 186 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/700c0f48/airflow/contrib/hooks/pinot_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/pinot_hook.py b/airflow/contrib/hooks/pinot_hook.py
new file mode 100644
index 0000000..d731211
--- /dev/null
+++ b/airflow/contrib/hooks/pinot_hook.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import six
+
+from pinotdb import connect
+
+from airflow.hooks.dbapi_hook import DbApiHook
+
+
+class PinotDbApiHook(DbApiHook):
+    """
+    Connect to pinot db(https://github.com/linkedin/pinot) to issue pql
+    """
+    conn_name_attr = 'pinot_broker_conn_id'
+    default_conn_name = 'pinot_broker_default'
+    supports_autocommit = False
+
+    def __init__(self, *args, **kwargs):
+        super(PinotDbApiHook, self).__init__(*args, **kwargs)
+
+    def get_conn(self):
+        """
+        Establish a connection to pinot broker through pinot dbqpi.
+        """
+        conn = self.get_connection(self.pinot_broker_conn_id)
+        pinot_broker_conn = connect(
+            host=conn.host,
+            port=conn.port,
+            path=conn.extra_dejson.get('endpoint', '/pql'),
+            scheme=conn.extra_dejson.get('schema', 'http')
+        )
+        self.log.info('Get the connection to pinot '
+                      'broker on {host}'.format(host=conn.host))
+        return pinot_broker_conn
+
+    def get_uri(self):
+        """
+        Get the connection uri for pinot broker.
+
+        e.g: http://localhost:9000/pql
+        """
+        conn = self.get_connection(getattr(self, self.conn_name_attr))
+        host = conn.host
+        if conn.port is not None:
+            host += ':{port}'.format(port=conn.port)
+        conn_type = 'http' if not conn.conn_type else conn.conn_type
+        endpoint = conn.extra_dejson.get('endpoint', 'pql')
+        return '{conn_type}://{host}/{endpoint}'.format(
+            conn_type=conn_type, host=host, endpoint=endpoint)
+
+    def get_records(self, sql):
+        """
+        Executes the sql and returns a set of records.
+
+        :param sql: the sql statement to be executed (str) or a list of
+            sql statements to execute
+        :type sql: str
+        """
+        if six.PY2:
+            sql = sql.encode('utf-8')
+
+        with self.get_conn() as cur:
+            cur.execute(sql)
+            return cur.fetchall()
+
+    def get_first(self, sql):
+        """
+        Executes the sql and returns the first resulting row.
+
+        :param sql: the sql statement to be executed (str) or a list of
+            sql statements to execute
+        :type sql: str or list
+        """
+        if six.PY2:
+            sql = sql.encode('utf-8')
+
+        with self.get_conn() as cur:
+            cur.execute(sql)
+            return cur.fetchone()
+
+    def set_autocommit(self, conn, autocommit):
+        raise NotImplementedError()
+
+    def get_pandas_df(self, sql, parameters=None):
+        raise NotImplementedError()
+
+    def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
+        raise NotImplementedError()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/700c0f48/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 16867f6..c979f26 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -365,6 +365,7 @@ Community contributed hooks
 .. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
 .. autoclass:: airflow.contrib.hooks.jenkins_hook.JenkinsHook
 .. autoclass:: airflow.contrib.hooks.jira_hook.JiraHook
+.. autoclass:: airflow.contrib.hooks.pinot_hook.PinotDbApiHook
 .. autoclass:: airflow.contrib.hooks.qubole_hook.QuboleHook
 .. autoclass:: airflow.contrib.hooks.redis_hook.RedisHook
 .. autoclass:: airflow.contrib.hooks.redshift_hook.RedshiftHook

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/700c0f48/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index fef8d8a..1842a14 100644
--- a/setup.py
+++ b/setup.py
@@ -151,6 +151,7 @@ mysql = ['mysqlclient>=1.3.6']
 rabbitmq = ['librabbitmq>=1.6.1']
 oracle = ['cx_Oracle>=5.1.2']
 postgres = ['psycopg2-binary>=2.7.4']
+pinot = ['pinotdb>=0.1.1']
 ssh = ['paramiko>=2.1.1', 'pysftp>=0.2.9']
 salesforce = ['simple-salesforce>=0.72']
 s3 = ['boto3>=1.7.0']
@@ -177,7 +178,7 @@ snowflake = ['snowflake-connector-python>=1.5.2',
              'snowflake-sqlalchemy>=1.1.0']
 zendesk = ['zdesk']
 
-all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid
+all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid + pinot
 devel = [
     'click',
     'freezegun',
@@ -200,7 +201,7 @@ devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
 devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle +
              docker + ssh + kubernetes + celery + azure + redis + gcp_api + datadog +
              zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
-             druid + snowflake + elasticsearch)
+             druid + pinot + snowflake + elasticsearch)
 
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
@@ -293,6 +294,7 @@ def do_setup():
             'mysql': mysql,
             'oracle': oracle,
             'password': password,
+            'pinot': pinot,
             'postgres': postgres,
             'qds': qds,
             'rabbitmq': rabbitmq,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/700c0f48/tests/contrib/hooks/test_pinot_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_pinot_hook.py b/tests/contrib/hooks/test_pinot_hook.py
new file mode 100644
index 0000000..d63ee1c
--- /dev/null
+++ b/tests/contrib/hooks/test_pinot_hook.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+import unittest
+
+from airflow.contrib.hooks.pinot_hook import PinotDbApiHook
+
+
+class TestPinotDbApiHook(unittest.TestCase):
+
+    def setUp(self):
+        super(TestPinotDbApiHook, self).setUp()
+        self.conn = conn = mock.MagicMock()
+        self.conn.host = 'host'
+        self.conn.port = '1000'
+        self.conn.conn_type = 'http'
+        self.conn.extra_dejson = {'endpoint': 'pql'}
+        self.cur = mock.MagicMock()
+        self.conn.__enter__.return_value = self.cur
+        self.conn.__exit__.return_value = None
+
+        class TestPinotDBApiHook(PinotDbApiHook):
+            def get_conn(self):
+                return conn
+
+            def get_connection(self, conn_id):
+                return conn
+
+        self.db_hook = TestPinotDBApiHook
+
+    def test_get_uri(self):
+        """
+        Test on getting a pinot connection uri
+        """
+        db_hook = self.db_hook()
+        self.assertEquals(db_hook.get_uri(), 'http://host:1000/pql')
+
+    def test_get_conn(self):
+        """
+        Test on getting a pinot connection
+        """
+        conn = self.db_hook().get_conn()
+        self.assertEqual(conn.host, 'host')
+        self.assertEqual(conn.port, '1000')
+        self.assertEqual(conn.conn_type, 'http')
+        self.assertEqual(conn.extra_dejson.get('endpoint'), 'pql')
+
+    def test_get_records(self):
+        statement = 'SQL'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.fetchall.return_value = result_sets
+        self.assertEqual(result_sets, self.db_hook().get_records(statement))
+
+    def test_get_first(self):
+        statement = 'SQL'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.fetchone.return_value = result_sets[0]
+        self.assertEqual(result_sets[0], self.db_hook().get_first(statement))


[26/50] incubator-airflow git commit: Revert "[AIRFLOW-2391] Fix to Flask 0.12.2"

Posted by fo...@apache.org.
Revert "[AIRFLOW-2391] Fix to Flask 0.12.2"

This reverts commit 3368f4258c2dcfbcdbaf631fa887a742f12720b8.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0ff434a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0ff434a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0ff434a9

Branch: refs/heads/v1-10-test
Commit: 0ff434a9b7510fb9cb1e74a60c5ac02b61f087d5
Parents: 700c0f4
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Mon Apr 30 10:35:05 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Apr 30 10:35:05 2018 +0200

----------------------------------------------------------------------
 setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0ff434a9/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 1842a14..cc2d4f3 100644
--- a/setup.py
+++ b/setup.py
@@ -228,7 +228,7 @@ def do_setup():
             'configparser>=3.5.0, <3.6.0',
             'croniter>=0.3.17, <0.4',
             'dill>=0.2.2, <0.3',
-            'flask==0.12.2',
+            'flask>=0.12, <0.13',
             'flask-appbuilder>=1.9.6, <2.0.0',
             'flask-admin==1.4.1',
             'flask-caching>=1.3.3, <1.4.0',


[27/50] incubator-airflow git commit: [AIRFLOW-2363] Fix return type bug in TaskHandler

Posted by fo...@apache.org.
[AIRFLOW-2363] Fix return type bug in TaskHandler

Closes #3259 from
yrqls21/kevin_yang_fix_s3_logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/19b39012
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/19b39012
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/19b39012

Branch: refs/heads/v1-10-test
Commit: 19b3901284b9f7a9f9a898dd1a1e823e5109cfa1
Parents: 0ff434a
Author: Kevin Yang <ke...@airbnb.com>
Authored: Mon Apr 30 12:49:06 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Apr 30 12:49:20 2018 +0200

----------------------------------------------------------------------
 airflow/bin/cli.py                      |  1 +
 airflow/utils/log/gcs_task_handler.py   | 11 ++++++-----
 airflow/utils/log/s3_task_handler.py    |  9 ++++-----
 airflow/utils/log/wasb_task_handler.py  |  9 ++++-----
 tests/utils/log/test_s3_task_handler.py | 12 ++++++++++--
 5 files changed, 25 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 8a92cfa..f26cbe4 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -468,6 +468,7 @@ def run(args, dag=None):
     else:
         with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
             _run(args, dag, ti)
+    logging.shutdown()
 
 
 @cli_utils.action_logging

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index d4a9871..8c34792 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -113,13 +113,14 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             remote_log = self.gcs_read(remote_loc)
             log = '*** Reading remote log from {}.\n{}\n'.format(
                 remote_loc, remote_log)
+            return log, {'end_of_log': True}
         except Exception as e:
             log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(
                 remote_loc, str(e))
             self.log.error(log)
-            log += super(GCSTaskHandler, self)._read(ti, try_number)
-
-        return log, {'end_of_log': True}
+            local_log, metadata = super(GCSTaskHandler, self)._read(ti, try_number)
+            log += local_log
+            return log, metadata
 
     def gcs_read(self, remote_log_location):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index f29a92f..07b9b3e 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -111,10 +111,9 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
             remote_log = self.s3_read(remote_loc, return_error=True)
             log = '*** Reading remote log from {}.\n{}\n'.format(
                 remote_loc, remote_log)
+            return log, {'end_of_log': True}
         else:
-            log = super(S3TaskHandler, self)._read(ti, try_number)
-
-        return log, {'end_of_log': True}
+            return super(S3TaskHandler, self)._read(ti, try_number)
 
     def s3_log_exists(self, remote_log_location):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/utils/log/wasb_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/wasb_task_handler.py b/airflow/utils/log/wasb_task_handler.py
index 1a9590d..a2a0c0d 100644
--- a/airflow/utils/log/wasb_task_handler.py
+++ b/airflow/utils/log/wasb_task_handler.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -119,10 +119,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
             remote_log = self.wasb_read(remote_loc, return_error=True)
             log = '*** Reading remote log from {}.\n{}\n'.format(
                 remote_loc, remote_log)
+            return log, {'end_of_log': True}
         else:
-            log = super(WasbTaskHandler, self)._read(ti, try_number)
-
-        return log, {'end_of_log': True}
+            return super(WasbTaskHandler, self)._read(ti, try_number)
 
     def wasb_log_exists(self, remote_log_location):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py
index c287fbc..a5d5f15 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -117,6 +117,14 @@ class TestS3TaskHandler(unittest.TestCase):
              'Log line\n\n'], [{'end_of_log': True}])
         )
 
+    def test_read_when_s3_log_missing(self):
+        log, metadata = self.s3_task_handler.read(self.ti)
+
+        self.assertEqual(1, len(log))
+        self.assertEqual(len(log), len(metadata))
+        self.assertIn('*** Log file does not exist:', log[0])
+        self.assertEqual({'end_of_log': True}, metadata[0])
+
     def test_read_raises_return_error(self):
         handler = self.s3_task_handler
         url = 's3://nonexistentbucket/foo'


[40/50] incubator-airflow git commit: [AIRFLOW-2410][AIRFLOW-75] Set the timezone in the RBAC Web UI

Posted by fo...@apache.org.
[AIRFLOW-2410][AIRFLOW-75] Set the timezone in the RBAC Web UI

SqlAlchemy does not know how to handle the
timestamp since it isnt
timezone aware

Closes #3303 from Fokko/AIRFLOW-2410


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/71954a52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/71954a52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/71954a52

Branch: refs/heads/v1-10-test
Commit: 71954a52fc13accf1130d3d2a00263d7ec369b02
Parents: 12ab796
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Wed May 2 22:49:39 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed May 2 22:49:39 2018 +0200

----------------------------------------------------------------------
 airflow/utils/timezone.py | 19 +++++++++++++++++--
 airflow/www_rbac/views.py | 12 ++++++------
 2 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/71954a52/airflow/utils/timezone.py
----------------------------------------------------------------------
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index af848df..6d49fbc 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -64,6 +64,21 @@ def utcnow():
     return d
 
 
+def utc_epoch():
+    """
+    Gets the epoch in the users timezone
+    :return:
+    """
+
+    # pendulum utcnow() is not used as that sets a TimezoneInfo object
+    # instead of a Timezone. This is not pickable and also creates issues
+    # when using replace()
+    d = dt.datetime(1970, 1, 1)
+    d = d.replace(tzinfo=utc)
+
+    return d
+
+
 def convert_to_utc(value):
     """
     Returns the datetime with the default timezone added if timezone

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/71954a52/airflow/www_rbac/views.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 9636a58..7c20a65 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -880,7 +880,7 @@ class Airflow(AirflowBaseView):
             base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else datetime(2000, 1, 1)
+        min_date = dates[0] if dates else timezone.utc_epoch()
 
         DR = models.DagRun
         dag_runs = (
@@ -1113,7 +1113,7 @@ class Airflow(AirflowBaseView):
             base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else datetime(2000, 1, 1)
+        min_date = dates[0] if dates else timezone.utc_epoch()
 
         root = request.args.get('root')
         if root:
@@ -1216,7 +1216,7 @@ class Airflow(AirflowBaseView):
             base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else datetime(2000, 1, 1)
+        min_date = dates[0] if dates else timezone.utc_epoch()
 
         root = request.args.get('root')
         if root:
@@ -1279,7 +1279,7 @@ class Airflow(AirflowBaseView):
             base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else datetime(2000, 1, 1)
+        min_date = dates[0] if dates else timezone.utc_epoch()
 
         root = request.args.get('root')
         if root:


[08/50] incubator-airflow git commit: [AIRFLOW-2348] Strip path prefix from the destination_object when source_object contains a wildcard[]

Posted by fo...@apache.org.
[AIRFLOW-2348] Strip path prefix from the destination_object when source_object contains a wildcard[]

Closes #3247 from csoulios/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/840930b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/840930b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/840930b4

Branch: refs/heads/v1-10-test
Commit: 840930b4d0c545adc24d8eb040974f88370e081b
Parents: 3368f42
Author: Christos Soulios <cs...@gmail.com>
Authored: Sat Apr 28 20:44:53 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sat Apr 28 20:44:53 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/gcs_to_gcs.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/840930b4/airflow/contrib/operators/gcs_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_gcs.py b/airflow/contrib/operators/gcs_to_gcs.py
index 6acc517..9bcf9d4 100644
--- a/airflow/contrib/operators/gcs_to_gcs.py
+++ b/airflow/contrib/operators/gcs_to_gcs.py
@@ -99,7 +99,7 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
             for source_object in objects:
                 if self.destination_object:
                     destination_object = "{}/{}".format(self.destination_object,
-                                                        source_object)
+                                                        source_object[wildcard_position:])
                 else:
                     destination_object = source_object
                 self.log.info('Executing copy of gs://{0}/{1} to '


[17/50] incubator-airflow git commit: [AIRFLOW-1313] Add vertica_to_mysql operator

Posted by fo...@apache.org.
[AIRFLOW-1313] Add vertica_to_mysql operator

Closes #2370 from juise/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3aa8e31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3aa8e31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3aa8e31

Branch: refs/heads/v1-10-test
Commit: c3aa8e31fae4edeff8abe6d38e573ef16583496c
Parents: c5d3576
Author: Alexander Petrovsky <as...@gmail.com>
Authored: Sun Apr 29 00:05:23 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 00:05:23 2018 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/vertica_to_mysql.py   | 142 +++++++++++++++++++
 scripts/ci/requirements.txt                     |   1 +
 .../contrib/operators/test_vertica_to_mysql.py  |  87 ++++++++++++
 3 files changed, 230 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3aa8e31/airflow/contrib/operators/vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_mysql.py b/airflow/contrib/operators/vertica_to_mysql.py
new file mode 100644
index 0000000..9b14e6c
--- /dev/null
+++ b/airflow/contrib/operators/vertica_to_mysql.py
@@ -0,0 +1,142 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.vertica_hook import VerticaHook
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+from contextlib import closing
+
+import unicodecsv as csv
+from tempfile import NamedTemporaryFile
+
+
+class VerticaToMySqlTransfer(BaseOperator):
+    """
+    Moves data from Vertica to MySQL.
+
+    :param sql: SQL query to execute against the Vertica database
+    :type sql: str
+    :param vertica_conn_id: source Vertica connection
+    :type vertica_conn_id: str
+    :param mysql_table: target MySQL table, use dot notation to target a
+        specific database
+    :type mysql_table: str
+    :param mysql_conn_id: source mysql connection
+    :type mysql_conn_id: str
+    :param mysql_preoperator: sql statement to run against MySQL prior to
+        import, typically use to truncate of delete in place of the data
+        coming in, allowing the task to be idempotent (running the task
+        twice won't double load data)
+    :type mysql_preoperator: str
+    :param mysql_postoperator: sql statement to run against MySQL after the
+        import, typically used to move data from staging to production
+        and issue cleanup commands.
+    :type mysql_postoperator: str
+    :param bulk_load: flag to use bulk_load option.  This loads MySQL directly
+        from a tab-delimited text file using the LOAD DATA LOCAL INFILE command.
+        This option requires an extra connection parameter for the
+        destination MySQL connection: {'local_infile': true}.
+    :type bulk_load: bool
+    """
+
+    template_fields = ('sql', 'mysql_table', 'mysql_preoperator',
+        'mysql_postoperator')
+    template_ext = ('.sql',)
+    ui_color = '#a0e08c'
+
+    @apply_defaults
+    def __init__(
+            self,
+            sql,
+            mysql_table,
+            vertica_conn_id='vertica_default',
+            mysql_conn_id='mysql_default',
+            mysql_preoperator=None,
+            mysql_postoperator=None,
+            bulk_load=False,
+            *args, **kwargs):
+        super(VerticaToMySqlTransfer, self).__init__(*args, **kwargs)
+        self.sql = sql
+        self.mysql_table = mysql_table
+        self.mysql_conn_id = mysql_conn_id
+        self.mysql_preoperator = mysql_preoperator
+        self.mysql_postoperator = mysql_postoperator
+        self.vertica_conn_id = vertica_conn_id
+        self.bulk_load = bulk_load
+
+    def execute(self, context):
+        vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
+        mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+
+        tmpfile = None
+        result = None
+
+        selected_columns = []
+
+        count = 0
+        with closing(vertica.get_conn()) as conn:
+            with closing(conn.cursor()) as cursor:
+                cursor.execute(self.sql)
+                selected_columns = [d.name for d in cursor.description]
+
+                if self.bulk_load:
+                    tmpfile = NamedTemporaryFile("w")
+
+                    logging.info("Selecting rows from Vertica to local file " + str(tmpfile.name) + "...")
+                    logging.info(self.sql)
+
+                    csv_writer = csv.writer(tmpfile, delimiter='\t', encoding='utf-8')
+                    for row in cursor.iterate():
+                        csv_writer.writerow(row)
+                        count += 1
+
+                    tmpfile.flush()
+                else:
+                    logging.info("Selecting rows from Vertica...")
+                    logging.info(self.sql)
+
+                    result = cursor.fetchall()
+                    count = len(result)
+
+                logging.info("Selected rows from Vertica " + str(count))
+
+        if self.mysql_preoperator:
+            logging.info("Running MySQL preoperator...")
+            mysql.run(self.mysql_preoperator)
+
+        try:
+            if self.bulk_load:
+                logging.info("Bulk inserting rows into MySQL...")
+                with closing(mysql.get_conn()) as conn:
+                    with closing(conn.cursor()) as cursor:
+                        cursor.execute("LOAD DATA LOCAL INFILE '%s' INTO TABLE %s LINES TERMINATED BY '\r\n' (%s)" % (tmpfile.name, self.mysql_table, ", ".join(selected_columns)))
+                        conn.commit()
+                tmpfile.close()
+            else:
+                logging.info("Inserting rows into MySQL...")
+                mysql.insert_rows(table=self.mysql_table, rows=result, target_fields=selected_columns)
+            logging.info("Inserted rows into MySQL " + str(count))
+        except:
+            logging.error("Inserted rows into MySQL 0")
+            raise
+
+        if self.mysql_postoperator:
+            logging.info("Running MySQL postoperator...")
+            mysql.run(self.mysql_postoperator)
+
+        logging.info("Done")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3aa8e31/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index bba5d29..a2a78df 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -92,5 +92,6 @@ statsd
 thrift
 thrift_sasl
 unicodecsv
+vertica_python
 zdesk
 kubernetes

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3aa8e31/tests/contrib/operators/test_vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_vertica_to_mysql.py b/tests/contrib/operators/test_vertica_to_mysql.py
new file mode 100644
index 0000000..5c05e3d
--- /dev/null
+++ b/tests/contrib/operators/test_vertica_to_mysql.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+
+import mock
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.vertica_to_mysql import VerticaToMySqlTransfer
+
+
+def mock_get_conn():
+    commit_mock = mock.MagicMock(
+    )
+    cursor_mock = mock.MagicMock(
+        execute     = [],
+        fetchall    = [['1', '2', '3']],
+        description =  ['a', 'b', 'c'],
+        iterate     = [['1', '2', '3']],
+    )
+    conn_mock = mock.MagicMock(
+        commit      = commit_mock,
+        cursor      = cursor_mock,
+    )
+    return conn_mock
+
+
+class TestVerticaToMySqlTransfer(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.VerticaHook.get_conn', side_effect=mock_get_conn)
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.MySqlHook.get_conn', side_effect=mock_get_conn)
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.MySqlHook.insert_rows', return_value=True)
+    def test_select_insert_transfer(self, *args):
+        """
+        Test check selection from vertica into memory and
+        after that inserting into mysql
+        """
+        task = VerticaToMySqlTransfer(task_id='test_task_id',
+                                      sql='select a, b, c',
+                                      mysql_table='test_table',
+                                      vertica_conn_id='test_vertica_conn_id',
+                                      mysql_conn_id='test_mysql_conn_id',
+                                      params={},
+                                      bulk_load=False,
+                                      dag=self.dag)
+        task.execute(None)
+
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.VerticaHook.get_conn', side_effect=mock_get_conn)
+    @mock.patch('airflow.contrib.operators.vertica_to_mysql.MySqlHook.get_conn', side_effect=mock_get_conn)
+    def test_select_bulk_insert_transfer(self, *args):
+        """
+        Test check selection from vertica into temporary file and
+        after that bulk inserting into mysql
+        """
+        task = VerticaToMySqlTransfer(task_id='test_task_id',
+                                      sql='select a, b, c',
+                                      mysql_table='test_table',
+                                      vertica_conn_id='test_vertica_conn_id',
+                                      mysql_conn_id='test_mysql_conn_id',
+                                      params={},
+                                      bulk_load=True,
+                                      dag=self.dag)
+        task.execute(None)
+
+
+if __name__ == '__main__':
+    unittest.main()


[22/50] incubator-airflow git commit: closes apache/incubator-airflow#2047 *Closed for inactivity*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2047 *Closed for inactivity*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/06e90f49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/06e90f49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/06e90f49

Branch: refs/heads/v1-10-test
Commit: 06e90f49615896ce5c0f720f0e760c10762e8f4c
Parents: 8c8d140
Author: r39132 <si...@yahoo.com>
Authored: Sun Apr 29 20:18:18 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 20:18:18 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[37/50] incubator-airflow git commit: [AIRFLOW-2404] Add additional documentation for unqueued task

Posted by fo...@apache.org.
[AIRFLOW-2404] Add additional documentation for unqueued task

Closes #3286 from AetherUnbound/feature/task-not-
queued-doc


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9a0c4e6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9a0c4e6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9a0c4e6c

Branch: refs/heads/v1-10-test
Commit: 9a0c4e6caeaddde19c2e104d959bdb36b46ab37a
Parents: 8c49e8d
Author: Matthew Bowden <bo...@spu.edu>
Authored: Wed May 2 08:12:49 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed May 2 08:12:49 2018 +0200

----------------------------------------------------------------------
 airflow/www/views.py | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9a0c4e6c/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index ff24031..6e2f1fc 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -858,8 +858,16 @@ class Airflow(BaseView):
         no_failed_deps_result = [(
             "Unknown",
             dedent("""\
-            All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:<br/>
+            All dependencies are met but the task instance is not running.
+            In most cases this just means that the task will probably
+            be scheduled soon unless:<br/>
             - The scheduler is down or under heavy load<br/>
+            - The following configuration values may be limiting the number
+            of queueable processes:
+              <code>parallelism</code>,
+              <code>dag_concurrency</code>,
+              <code>max_active_dag_runs_per_dag</code>,
+              <code>non_pooled_task_slot_count</code><br/>
             {}
             <br/>
             If this task instance does not start soon please contact your Airflow """


[18/50] incubator-airflow git commit: [AIRFLOW-1960] Add support for secrets in kubernetes operator

Posted by fo...@apache.org.
[AIRFLOW-1960] Add support for secrets in kubernetes operator

Closes #3271 from ese/secrets-kubernetes-operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/72f15a10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/72f15a10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/72f15a10

Branch: refs/heads/v1-10-test
Commit: 72f15a108e5556970229cb68d3b0968ee18db46e
Parents: c3aa8e3
Author: Sergio Ballesteros <sn...@locolandia.net>
Authored: Sun Apr 29 11:54:55 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sun Apr 29 11:54:55 2018 +0200

----------------------------------------------------------------------
 .../kubernetes_request_factory.py               | 11 +++++--
 airflow/contrib/kubernetes/secret.py            |  5 +++-
 .../operators/kubernetes_pod_operator.py        | 31 ++++++++++++++++++--
 3 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 6e8632f..12d05ec 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -84,7 +84,10 @@ class KubernetesRequestFactory:
 
     @staticmethod
     def attach_volumes(pod, req):
-        req['spec']['volumes'] = pod.volumes
+        req['spec']['volumes'] = (
+            req['spec'].get('volumes', []))
+        if len(pod.volumes) > 0:
+            req['spec']['volumes'].extend(pod.volumes)
 
     @staticmethod
     def attach_volume_mounts(pod, req):
@@ -101,8 +104,10 @@ class KubernetesRequestFactory:
     def extract_volume_secrets(pod, req):
         vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
         if any(vol_secrets):
-            req['spec']['containers'][0]['volumeMounts'] = []
-            req['spec']['volumes'] = []
+            req['spec']['containers'][0]['volumeMounts'] = (
+                req['spec']['containers'][0].get('volumeMounts', []))
+            req['spec']['volumes'] = (
+                req['spec'].get('volumes', []))
         for idx, vol in enumerate(vol_secrets):
             vol_id = 'secretvol' + str(idx)
             req['spec']['containers'][0]['volumeMounts'].append({

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
index 23bfacc..5c1038c 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -25,7 +25,8 @@ class Secret:
         :param deploy_type: The type of secret deploy in Kubernetes, either `env` or
             `volume`
         :type deploy_type: ``str``
-        :param deploy_target: The environment variable to be created in the worker.
+        :param deploy_target: The environment variable when `deploy_type` `env` or
+            file path when `deploy_type` `volume` where expose secret
         :type deploy_target: ``str``
         :param secret: Name of the secrets object in Kubernetes
         :type secret: ``str``
@@ -34,5 +35,7 @@ class Secret:
         """
         self.deploy_type = deploy_type
         self.deploy_target = deploy_target.upper()
+        if deploy_type == 'volume':
+            self.deploy_target = deploy_target
         self.secret = secret
         self.key = key

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index bcc3cad..9e95d8b 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -27,6 +27,31 @@ ui_color = '#ffefeb'
 
 
 class KubernetesPodOperator(BaseOperator):
+    """
+    Execute a task in a Kubernetes Pod
+
+    :param image: Docker image name
+    :type image: str
+    :param: namespace: namespace name where run the Pod
+    :type: namespace: str
+    :param cmds: entrypoint of the container
+    :type cmds: list
+    :param arguments: arguments of to the entrypoint.
+        The docker image's CMD is used if this is not provided.
+    :type arguments: list
+    :param labels: labels to apply to the Pod
+    :type labels: dict
+    :param startup_timeout_seconds: timeout in seconds to startup the pod
+    :type startup_timeout_seconds: int
+    :param name: name for the pod
+    :type name: str
+    :param secrets: Secrets to attach to the container
+    :type secrets: list
+    :param in_cluster: run kubernetes client with in_cluster configuration
+    :type in_cluster: bool
+    :param get_logs: get the stdout of the container as logs of the tasks
+    """
+
     def execute(self, context):
         try:
 
@@ -42,6 +67,8 @@ class KubernetesPodOperator(BaseOperator):
                 labels=self.labels
             )
 
+            pod.secrets = self.secrets
+
             launcher = pod_launcher.PodLauncher(client)
             final_state = launcher.run_pod(
                 pod,
@@ -59,15 +86,14 @@ class KubernetesPodOperator(BaseOperator):
                  cmds,
                  arguments,
                  name,
+                 secrets=None,
                  in_cluster=False,
                  labels=None,
                  startup_timeout_seconds=120,
-                 kube_executor_config=None,
                  get_logs=True,
                  *args,
                  **kwargs):
         super(KubernetesPodOperator, self).__init__(*args, **kwargs)
-        self.kube_executor_config = kube_executor_config or {}
         self.image = image
         self.namespace = namespace
         self.cmds = cmds
@@ -75,5 +101,6 @@ class KubernetesPodOperator(BaseOperator):
         self.labels = labels or {}
         self.startup_timeout_seconds = startup_timeout_seconds
         self.name = name
+        self.secrets = secrets or []
         self.in_cluster = in_cluster
         self.get_logs = get_logs


[24/50] incubator-airflow git commit: [AIRFLOW-2390] Resolve FlaskWTFDeprecationWarning

Posted by fo...@apache.org.
[AIRFLOW-2390] Resolve FlaskWTFDeprecationWarning

- Replace soon to be deprecated `flask_wtf.Form`
class with `from flask_wtf import FlaskForm`

Closes #3278 from kaxil/AIRFLOW-2390


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ae48fce9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ae48fce9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ae48fce9

Branch: refs/heads/v1-10-test
Commit: ae48fce9b097e48f09b3b8829b3b80dea21c58e9
Parents: e29562e
Author: Kaxil Naik <ka...@gmail.com>
Authored: Mon Apr 30 08:35:54 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Apr 30 08:35:54 2018 +0200

----------------------------------------------------------------------
 airflow/www/forms.py      | 10 +++++-----
 airflow/www_rbac/forms.py | 10 +++++-----
 2 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae48fce9/airflow/www/forms.py
----------------------------------------------------------------------
diff --git a/airflow/www/forms.py b/airflow/www/forms.py
index 1904e72..d6fbe13 100644
--- a/airflow/www/forms.py
+++ b/airflow/www/forms.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,16 +25,16 @@ from __future__ import unicode_literals
 from airflow.utils import timezone
 from flask_admin.form import DateTimePickerWidget
 from wtforms import DateTimeField, SelectField
-from flask_wtf import Form
+from flask_wtf import FlaskForm
 
 
-class DateTimeForm(Form):
+class DateTimeForm(FlaskForm):
     # Date filter form needed for gantt and graph view
     execution_date = DateTimeField(
         "Execution date", widget=DateTimePickerWidget())
 
 
-class DateTimeWithNumRunsForm(Form):
+class DateTimeWithNumRunsForm(FlaskForm):
     # Date time and number of runs form for tree view, task duration
     # and landing times
     base_date = DateTimeField(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae48fce9/airflow/www_rbac/forms.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/forms.py b/airflow/www_rbac/forms.py
index 63d27e6..61a7ce2 100644
--- a/airflow/www_rbac/forms.py
+++ b/airflow/www_rbac/forms.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,20 +30,20 @@ from flask_appbuilder.fieldwidgets import (BS3TextFieldWidget, BS3TextAreaFieldW
                                            BS3PasswordFieldWidget, Select2Widget,
                                            DateTimePickerWidget)
 from flask_babel import lazy_gettext
-from flask_wtf import Form
+from flask_wtf import FlaskForm
 
 from wtforms import validators
 from wtforms.fields import (IntegerField, SelectField, TextAreaField, PasswordField,
                             StringField, DateTimeField, BooleanField)
 
 
-class DateTimeForm(Form):
+class DateTimeForm(FlaskForm):
     # Date filter form needed for gantt and graph view
     execution_date = DateTimeField(
         "Execution date", widget=DateTimePickerWidget())
 
 
-class DateTimeWithNumRunsForm(Form):
+class DateTimeWithNumRunsForm(FlaskForm):
     # Date time and number of runs form for tree view, task duration
     # and landing times
     base_date = DateTimeField(


[36/50] incubator-airflow git commit: closes apache/incubator-airflow#2337 *No longer a bug*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2337 *No longer a bug*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8c49e8d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8c49e8d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8c49e8d9

Branch: refs/heads/v1-10-test
Commit: 8c49e8d9bcd905dcd7a40d467bac94a687d85d75
Parents: fef7d30
Author: r39132 <si...@yahoo.com>
Authored: Tue May 1 22:41:32 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Tue May 1 22:41:32 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[48/50] incubator-airflow git commit: [AIRFLOW-1812] Update logging example

Posted by fo...@apache.org.
[AIRFLOW-1812] Update logging example

The logging has changed, therefore we should also
update the
updating.md guide

Closes #2784 from Fokko/AIRFLOW-1812-update-
logging-example


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/97ab9e76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/97ab9e76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/97ab9e76

Branch: refs/heads/v1-10-test
Commit: 97ab9e762c1f325317fd8003e2846e931137a1e6
Parents: 3a28ceb
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Thu May 3 22:58:44 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu May 3 22:58:44 2018 -0700

----------------------------------------------------------------------
 UPDATING.md | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 112 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/97ab9e76/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 8006876..defd95b 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -61,9 +61,9 @@ Dataflow job labeling is now supported in Dataflow{Java,Python}Operator with a d
 to 2.2.0 or greater.
 
 ### Redshift to S3 Operator
-With Airflow 1.9 or lower, Unload operation always included header row. In order to include header row, 
+With Airflow 1.9 or lower, Unload operation always included header row. In order to include header row,
 we need to turn off parallel unload. It is preferred to perform unload operation using all nodes so that it is
-faster for larger tables. So, parameter called `include_header` is added and default is set to False. 
+faster for larger tables. So, parameter called `include_header` is added and default is set to False.
 Header row will be added only if this parameter is set True and also in that case parallel will be automatically turned off (`PARALLEL OFF`)  
 
 ### Google cloud connection string
@@ -119,7 +119,116 @@ logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
 
 The logging configuration file needs to be on the `PYTHONPATH`, for example `$AIRFLOW_HOME/config`. This directory is loaded by default. Any directory may be added to the `PYTHONPATH`, this might be handy when the config is in another directory or a volume is mounted in case of Docker.
 
-The config can be taken from `airflow/config_templates/airflow_local_settings.py` as a starting point. Copy the contents to `${AIRFLOW_HOME}/config/airflow_local_settings.py`,  and alter the config as is preferred. 
+The config can be taken from `airflow/config_templates/airflow_local_settings.py` as a starting point. Copy the contents to `${AIRFLOW_HOME}/config/airflow_local_settings.py`,  and alter the config as is preferred.
+
+```
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from airflow import configuration as conf
+
+# TODO: Logging format and level should be configured
+# in this file instead of from airflow.cfg. Currently
+# there are other log format and level configurations in
+# settings.py and cli.py. Please see AIRFLOW-1455.
+
+LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
+LOG_FORMAT = conf.get('core', 'log_format')
+
+BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
+PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
+
+FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
+PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
+
+DEFAULT_LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow.task': {
+            'format': LOG_FORMAT,
+        },
+        'airflow.processor': {
+            'format': LOG_FORMAT,
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        },
+        'file.task': {
+            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'filename_template': FILENAME_TEMPLATE,
+        },
+        'file.processor': {
+            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
+            'formatter': 'airflow.processor',
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
+        }
+        # When using s3 or gcs, provide a customized LOGGING_CONFIG
+        # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
+        # for details
+        # 's3.task': {
+        #     'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
+        #     'formatter': 'airflow.task',
+        #     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+        #     's3_log_folder': S3_LOG_FOLDER,
+        #     'filename_template': FILENAME_TEMPLATE,
+        # },
+        # 'gcs.task': {
+        #     'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
+        #     'formatter': 'airflow.task',
+        #     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+        #     'gcs_log_folder': GCS_LOG_FOLDER,
+        #     'filename_template': FILENAME_TEMPLATE,
+        # },
+    },
+    'loggers': {
+        '': {
+            'handlers': ['console'],
+            'level': LOG_LEVEL
+        },
+        'airflow': {
+            'handlers': ['console'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+        'airflow.processor': {
+            'handlers': ['file.processor'],
+            'level': LOG_LEVEL,
+            'propagate': True,
+        },
+        'airflow.task': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+        'airflow.task_runner': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': True,
+        },
+    }
+}
+```
 
 To customize the logging (for example, use logging rotate), define one or more of the logging handles that [Python has to offer](https://docs.python.org/3/library/logging.handlers.html). For more details about the Python logging, please refer to the [official logging documentation](https://docs.python.org/3/library/logging.html).
 


[13/50] incubator-airflow git commit: closes apache/incubator-airflow#2744 *Closed for inactivity.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2744 *Closed for inactivity.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f67e9673
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f67e9673
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f67e9673

Branch: refs/heads/v1-10-test
Commit: f67e9673f5ced64aa3681cf9343a4b8b73f7903b
Parents: 2d588e9
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 23:30:32 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:30:32 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[29/50] incubator-airflow git commit: [AIRFLOW-1313] Fix license header

Posted by fo...@apache.org.
[AIRFLOW-1313] Fix license header

Closes #3281 from juise/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a7e8f489
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a7e8f489
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a7e8f489

Branch: refs/heads/v1-10-test
Commit: a7e8f48963e38ed233a84b1439715fdbcb9ec619
Parents: 9a8e4f7
Author: Alexander Petrovsky <as...@gmail.com>
Authored: Mon Apr 30 09:11:22 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 09:11:22 2018 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/vertica_to_mysql.py   | 23 +++++++++++--------
 .../contrib/operators/test_vertica_to_mysql.py  | 24 ++++++++++++--------
 2 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7e8f489/airflow/contrib/operators/vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_mysql.py b/airflow/contrib/operators/vertica_to_mysql.py
index 9b14e6c..2e9325d 100644
--- a/airflow/contrib/operators/vertica_to_mysql.py
+++ b/airflow/contrib/operators/vertica_to_mysql.py
@@ -1,16 +1,21 @@
 # -*- coding: utf-8 -*-
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import logging
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7e8f489/tests/contrib/operators/test_vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_vertica_to_mysql.py b/tests/contrib/operators/test_vertica_to_mysql.py
index 5c05e3d..615f111 100644
--- a/tests/contrib/operators/test_vertica_to_mysql.py
+++ b/tests/contrib/operators/test_vertica_to_mysql.py
@@ -1,17 +1,21 @@
 # -*- coding: utf-8 -*-
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import datetime
 


[38/50] incubator-airflow git commit: [AIRFLOW-2406] Add Apache2 License Shield to Readme

Posted by fo...@apache.org.
[AIRFLOW-2406] Add Apache2 License Shield to Readme

Closes #3290 from
r39132/add_apache2_license_shield_to_readme


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/96d00da5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/96d00da5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/96d00da5

Branch: refs/heads/v1-10-test
Commit: 96d00da5b038d12b01aa538f0f49722435bedbac
Parents: 9a0c4e6
Author: r39132 <si...@yahoo.com>
Authored: Tue May 1 23:36:59 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Tue May 1 23:36:59 2018 -0700

----------------------------------------------------------------------
 README.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/96d00da5/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 86b4a8c..bdbfbf2 100644
--- a/README.md
+++ b/README.md
@@ -4,6 +4,7 @@
 [![Build Status](https://travis-ci.org/apache/incubator-airflow.svg?branch=master)](https://travis-ci.org/apache/incubator-airflow)
 [![Coverage Status](https://img.shields.io/codecov/c/github/apache/incubator-airflow/master.svg)](https://codecov.io/github/apache/incubator-airflow?branch=master)
 [![Documentation Status](https://readthedocs.org/projects/airflow/badge/?version=latest)](https://airflow.readthedocs.io/en/latest/?badge=latest)
+[![License](http://img.shields.io/:license-Apache%202-blue.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
 [![Join the chat at https://gitter.im/apache/incubator-airflow](https://badges.gitter.im/apache/incubator-airflow.svg)](https://gitter.im/apache/incubator-airflow?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
 
 _NOTE: The transition from 1.8.0 (or before) to 1.8.1 (or after) requires uninstalling Airflow before installing the new version. The package name was changed from `airflow` to `apache-airflow` as of version 1.8.1._


[03/50] incubator-airflow git commit: [AIRFLOW-2381] Fix the flaky ApiPasswordTests test

Posted by fo...@apache.org.
[AIRFLOW-2381] Fix the flaky ApiPasswordTests test

This test is in conflict with different tests
running in parallel
By calling a simple overview page, the behaviour
of checking the
password is still checked, but isn't dependent on
a specific dag
being present in the database

Closes #3269 from Fokko/AIRFLOW-2381


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/414a08e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/414a08e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/414a08e5

Branch: refs/heads/v1-10-test
Commit: 414a08e505e2ae86f4272bfc8b0d28d66ef6ab6a
Parents: 801fe7d
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Fri Apr 27 16:43:40 2018 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Apr 27 16:43:40 2018 +0200

----------------------------------------------------------------------
 .../api/experimental/test_password_endpoints.py  | 19 +++++--------------
 1 file changed, 5 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/414a08e5/tests/www/api/experimental/test_password_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/test_password_endpoints.py b/tests/www/api/experimental/test_password_endpoints.py
index ecddff1..8131ff5 100644
--- a/tests/www/api/experimental/test_password_endpoints.py
+++ b/tests/www/api/experimental/test_password_endpoints.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -56,24 +56,15 @@ class ApiPasswordTests(unittest.TestCase):
 
     def test_authorized(self):
         with self.app.test_client() as c:
-            url_template = '/api/experimental/dags/{}/dag_runs'
-            response = c.post(
-                url_template.format('example_bash_operator'),
-                data=json.dumps(dict(run_id='my_run' + datetime.now().isoformat())),
-                content_type="application/json",
+            response = c.get(
+                '/api/experimental/pools',
                 headers={'Authorization': 'Basic aGVsbG86d29ybGQ='}  # hello:world
             )
             self.assertEqual(200, response.status_code)
 
     def test_unauthorized(self):
         with self.app.test_client() as c:
-            url_template = '/api/experimental/dags/{}/dag_runs'
-            response = c.post(
-                url_template.format('example_bash_operator'),
-                data=json.dumps(dict(run_id='my_run' + datetime.now().isoformat())),
-                content_type="application/json"
-            )
-
+            response = c.get('/api/experimental/pools')
             self.assertEqual(401, response.status_code)
 
     def tearDown(self):


[15/50] incubator-airflow git commit: closes apache/incubator-airflow#2555 *Fixed by another PR.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2555 *Fixed by another PR.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b34c2963
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b34c2963
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b34c2963

Branch: refs/heads/v1-10-test
Commit: b34c29635065a6acee14851758aecd4b4f4e9dfd
Parents: 99cf26d
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 23:35:19 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:35:19 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[16/50] incubator-airflow git commit: closes apache/incubator-airflow#3209 *PR in heavy need of squashing and cleanup.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#3209 *PR in heavy need of squashing and cleanup.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c5d35761
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c5d35761
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c5d35761

Branch: refs/heads/v1-10-test
Commit: c5d35761f4768f434527b3a26b101b8d87f7965f
Parents: b34c296
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 23:54:19 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:54:19 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[11/50] incubator-airflow git commit: closes apache/incubator-airflow#3276 *Messed up PR - hundreds of old commits.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#3276 *Messed up PR - hundreds of old commits.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e691acc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e691acc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e691acc1

Branch: refs/heads/v1-10-test
Commit: e691acc115683f1fc6881e6dcdbc9fdedffde667
Parents: e9b74b6
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 14:57:27 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 14:57:27 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[09/50] incubator-airflow git commit: [AIRFLOW-2370] Implement --use_random_password in create_user

Posted by fo...@apache.org.
[AIRFLOW-2370] Implement --use_random_password in create_user

Closes #3262 from wrp/passwords


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5aa15868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5aa15868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5aa15868

Branch: refs/heads/v1-10-test
Commit: 5aa15868fd9276d82df33f8fe2e4e3cfc7f4d60a
Parents: 840930b
Author: William Pursell <wi...@wepay.com>
Authored: Sat Apr 28 20:59:59 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sat Apr 28 20:59:59 2018 +0200

----------------------------------------------------------------------
 airflow/bin/cli.py | 32 ++++++++++++++++++++------------
 tests/core.py      |  7 +++++++
 2 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5aa15868/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 975f481..8a92cfa 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,9 +22,10 @@ from __future__ import print_function
 import logging
 
 import os
-import socket
 import subprocess
 import textwrap
+import random
+import string
 from importlib import import_module
 
 import daemon
@@ -1209,27 +1210,28 @@ def create_user(args):
     }
     empty_fields = [k for k, v in fields.items() if not v]
     if empty_fields:
-        print('Missing arguments: {}.'.format(', '.join(empty_fields)))
-        sys.exit(0)
+        raise SystemExit('Required arguments are missing: {}.'.format(
+            ', '.join(empty_fields)))
 
     appbuilder = cached_appbuilder()
     role = appbuilder.sm.find_role(args.role)
     if not role:
-        print('{} is not a valid role.'.format(args.role))
-        sys.exit(0)
+        raise SystemExit('{} is not a valid role.'.format(args.role))
 
-    password = getpass.getpass('Password:')
-    password_confirmation = getpass.getpass('Repeat for confirmation:')
-    if password != password_confirmation:
-        print('Passwords did not match!')
-        sys.exit(0)
+    if args.use_random_password:
+        password = ''.join(random.choice(string.printable) for _ in range(16))
+    else:
+        password = getpass.getpass('Password:')
+        password_confirmation = getpass.getpass('Repeat for confirmation:')
+        if password != password_confirmation:
+            raise SystemExit('Passwords did not match!')
 
     user = appbuilder.sm.add_user(args.username, args.firstname, args.lastname,
                                   args.email, role, password)
     if user:
         print('{} user {} created.'.format(args.role, args.username))
     else:
-        print('Failed to create user.')
+        raise SystemExit('Failed to create user.')
 
 
 Arg = namedtuple(
@@ -1619,6 +1621,11 @@ class CLIFactory(object):
             ('-u', '--username',),
             help='Username of the user',
             type=str),
+        'use_random_password': Arg(
+            ('--use_random_password',),
+            help='Do not prompt for password.  Use random string instead',
+            default=False,
+            action='store_true'),
     }
     subparsers = (
         {
@@ -1759,7 +1766,8 @@ class CLIFactory(object):
         }, {
             'func': create_user,
             'help': "Create an admin account",
-            'args': ('role', 'username', 'email', 'firstname', 'lastname'),
+            'args': ('role', 'username', 'email', 'firstname', 'lastname',
+                     'use_random_password'),
         },
     )
     subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5aa15868/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 9cbae9d..6d18ffe 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -983,6 +983,13 @@ class CliTests(unittest.TestCase):
         args = self.parser.parse_args(['list_dags', '--report'])
         cli.list_dags(args)
 
+    def test_cli_create_user(self):
+        args = self.parser.parse_args([
+            'create_user', '-u', 'test', '-l', 'doe', '-f', 'jon',
+            '-e', 'jdoe@foo.com', '-r', 'Viewer', '--use_random_password'
+        ])
+        cli.create_user(args)
+
     def test_cli_list_tasks(self):
         for dag_id in self.dagbag.dags.keys():
             args = self.parser.parse_args(['list_tasks', dag_id])


[14/50] incubator-airflow git commit: closes apache/incubator-airflow#2555 *Fixed by another PR.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2555 *Fixed by another PR.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/99cf26df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/99cf26df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/99cf26df

Branch: refs/heads/v1-10-test
Commit: 99cf26dfea575f5bd2d0e2e764416259d34c8a41
Parents: f67e967
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 23:35:10 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:35:10 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[33/50] incubator-airflow git commit: [AIRFLOW-1853] Show only the desired number of runs in tree view

Posted by fo...@apache.org.
[AIRFLOW-1853] Show only the desired number of runs in tree view

Previously, the "Number of runs" option was not
being respected for DAGs that were externally
triggered. Now, only the set number of runs is
shown regardless of DAG trigger type.

Also adjust www_rbac

Closes #3288 from
AetherUnbound/feature/AIRFLOW-1853


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d1b2aee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d1b2aee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d1b2aee

Branch: refs/heads/v1-10-test
Commit: 2d1b2aee91bc47cfe48a470e1618f026ece86a4d
Parents: a67c13e
Author: Matthew Bowden <bo...@spu.edu>
Authored: Tue May 1 22:03:24 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Tue May 1 22:03:24 2018 +0200

----------------------------------------------------------------------
 airflow/www/views.py      | 4 ++++
 airflow/www_rbac/views.py | 4 ++++
 2 files changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d1b2aee/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5dda036..ff24031 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1254,6 +1254,10 @@ class Airflow(BaseView):
             dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
 
         dates = sorted(list(dag_runs.keys()))
+        # Only show the desired number of runs regardless of the trigger method
+        if len(dates) > num_runs:
+            dates = dates[-num_runs:]
+
         max_date = max(dates) if dates else None
 
         tis = dag.get_task_instances(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d1b2aee/airflow/www_rbac/views.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index f064c14..9636a58 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -894,6 +894,10 @@ class Airflow(AirflowBaseView):
             dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
 
         dates = sorted(list(dag_runs.keys()))
+        # Only show the desired number of runs regardless of the trigger method
+        if len(dates) > num_runs:
+            dates = dates[-num_runs:]
+
         max_date = max(dates) if dates else None
 
         tis = dag.get_task_instances(


[49/50] incubator-airflow git commit: closes apache/incubator-airflow#2478 *Closed for inactivity.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2478 *Closed for inactivity.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d1f7af39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d1f7af39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d1f7af39

Branch: refs/heads/v1-10-test
Commit: d1f7af393f2d0e27807d0fc31d8023544f5c2aac
Parents: 97ab9e7
Author: r39132 <si...@yahoo.com>
Authored: Thu May 3 23:02:59 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu May 3 23:02:59 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[41/50] incubator-airflow git commit: [AIRFLOW-2409] Supply password as a parameter

Posted by fo...@apache.org.
[AIRFLOW-2409] Supply password as a parameter

Supply the password as a parameter on the cli

Closes #3304 from Fokko/supply-password


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2a079b95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2a079b95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2a079b95

Branch: refs/heads/v1-10-test
Commit: 2a079b953fe35049805079684fe43a1499694e4b
Parents: 71954a5
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Thu May 3 08:32:51 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Thu May 3 08:32:51 2018 +0200

----------------------------------------------------------------------
 airflow/bin/cli.py |  8 +++++++-
 tests/core.py      | 15 +++++++++++----
 2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2a079b95/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index f26cbe4..3097dce 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1221,6 +1221,8 @@ def create_user(args):
 
     if args.use_random_password:
         password = ''.join(random.choice(string.printable) for _ in range(16))
+    elif args.password:
+        password = args.password
     else:
         password = getpass.getpass('Password:')
         password_confirmation = getpass.getpass('Repeat for confirmation:')
@@ -1622,6 +1624,10 @@ class CLIFactory(object):
             ('-u', '--username',),
             help='Username of the user',
             type=str),
+        'password': Arg(
+            ('-p', '--password',),
+            help='Password of the user',
+            type=str),
         'use_random_password': Arg(
             ('--use_random_password',),
             help='Do not prompt for password.  Use random string instead',
@@ -1768,7 +1774,7 @@ class CLIFactory(object):
             'func': create_user,
             'help': "Create an admin account",
             'args': ('role', 'username', 'email', 'firstname', 'lastname',
-                     'use_random_password'),
+                     'password', 'use_random_password'),
         },
     )
     subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2a079b95/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 6d18ffe..4cece16 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -983,13 +983,20 @@ class CliTests(unittest.TestCase):
         args = self.parser.parse_args(['list_dags', '--report'])
         cli.list_dags(args)
 
-    def test_cli_create_user(self):
+    def test_cli_create_user_random_password(self):
         args = self.parser.parse_args([
-            'create_user', '-u', 'test', '-l', 'doe', '-f', 'jon',
+            'create_user', '-u', 'test1', '-l', 'doe', '-f', 'jon',
             '-e', 'jdoe@foo.com', '-r', 'Viewer', '--use_random_password'
         ])
         cli.create_user(args)
 
+    def test_cli_create_user_supplied_password(self):
+        args = self.parser.parse_args([
+            'create_user', '-u', 'test2', '-l', 'doe', '-f', 'jon',
+            '-e', 'jdoe@apache.org', '-r', 'Viewer', '-p', 'test'
+        ])
+        cli.create_user(args)
+
     def test_cli_list_tasks(self):
         for dag_id in self.dagbag.dags.keys():
             args = self.parser.parse_args(['list_tasks', dag_id])


[50/50] incubator-airflow git commit: [AIRFLOW-1899] Fix Kubernetes tests

Posted by fo...@apache.org.
[AIRFLOW-1899] Fix Kubernetes tests

[AIRFLOW-1899] Add full deployment

- Made home directory configurable
- Documentation fix
- Add licenses

[AIRFLOW-1899] Tests for the Kubernetes Executor

Add an integration test for the Kubernetes
executor. Done by
spinning up different versions of kubernetes and
run a DAG
by invoking the REST API

Closes #3301 from Fokko/fix-kubernetes-executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/16bae563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/16bae563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/16bae563

Branch: refs/heads/v1-10-test
Commit: 16bae5634df24132b37eb752fe816f51bf7e83ca
Parents: d1f7af3
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Fri May 4 08:58:12 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri May 4 08:58:12 2018 +0200

----------------------------------------------------------------------
 .gitignore                                      |   1 -
 .travis.yml                                     |   3 -
 airflow/config_templates/default_airflow.cfg    |   8 +
 airflow/configuration.py                        |   9 +-
 .../contrib/executors/kubernetes_executor.py    |  80 ++---
 airflow/contrib/kubernetes/kube_client.py       |   4 +-
 .../contrib/kubernetes/worker_configuration.py  |  19 +-
 airflow/jobs.py                                 |   4 +-
 airflow/www_rbac/api/experimental/endpoints.py  |  27 +-
 scripts/ci/kubernetes/docker/Dockerfile         |   5 +
 scripts/ci/kubernetes/docker/airflow-init.sh    |  24 ++
 scripts/ci/kubernetes/docker/build.sh           |  11 +-
 scripts/ci/kubernetes/kube/airflow.yaml         | 102 +-----
 scripts/ci/kubernetes/kube/configmaps.yaml      | 359 +++++++++++++++++++
 scripts/ci/kubernetes/kube/deploy.sh            |   6 +
 scripts/ci/kubernetes/kube/postgres.yaml        |   1 +
 scripts/ci/kubernetes/kube/secrets.yaml         |  25 ++
 scripts/ci/kubernetes/kube/volumes.yaml         |   1 +
 scripts/ci/travis_script.sh                     |   2 +-
 setup.cfg                                       |  13 +-
 tests/cli/test_cli.py                           | 101 +++++-
 tests/contrib/kubernetes/__init__.py            |  14 -
 tests/contrib/minikube/__init__.py              |  18 +
 .../minikube/test_kubernetes_executor.py        |  97 +++++
 .../minikube/test_kubernetes_pod_operator.py    |  98 +++++
 tests/contrib/minikube_tests/__init__.py        |  18 -
 .../minikube_tests/integration/__init__.py      |  13 -
 .../integration/airflow_controller.py           | 166 ---------
 .../test_kubernetes_executor_integration.py     |  67 ----
 .../test_kubernetes_pod_operator.py             |  93 -----
 .../www_rbac/api/experimental/test_endpoints.py |  21 +-
 31 files changed, 866 insertions(+), 544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 04c408f..a42962f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -137,4 +137,3 @@ rat-results.txt
 *.generated
 *.tar.gz
 scripts/ci/kubernetes/kube/.generated/airflow.yaml
-

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6d29a7a..77033df 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -80,9 +80,6 @@ matrix:
       env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
     - python: "2.7"
       env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
-  allow_failures:
-    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
-    - env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
 cache:
   directories:
     - $HOME/.wheelhouse/

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index fa5eea0..6da4287 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -502,6 +504,7 @@ hide_sensitive_variable_fields = True
 
 [elasticsearch]
 elasticsearch_host =
+
 [kubernetes]
 # The repository and tag of the Kubernetes Image for the Worker to Run
 worker_container_repository =
@@ -550,6 +553,11 @@ image_pull_secrets =
 # Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2
 gcp_service_account_keys =
 
+# Use the service account kubernetes gives to pods to connect to kubernetes cluster.
+# It’s intended for clients that expect to be running inside a pod running on kubernetes.
+# It will raise an exception if called from a process not running in a kubernetes environment.
+in_cluster = True
+
 [kubernetes_secrets]
 # The scheduler mounts the following secrets into your workers as they are launched by the
 # scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 130356c..20ef067 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -442,7 +442,10 @@ if not os.path.isfile(AIRFLOW_CONFIG):
     )
     with open(AIRFLOW_CONFIG, 'w') as f:
         cfg = parameterized_config(DEFAULT_CONFIG)
-        f.write(cfg.split(TEMPLATE_START)[-1].strip())
+        cfg = cfg.split(TEMPLATE_START)[-1].strip()
+        if six.PY2:
+            cfg = cfg.encode('utf8')
+        f.write(cfg)
 
 log.info("Reading the config from %s", AIRFLOW_CONFIG)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index cdce95f..17b2908 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -23,6 +23,7 @@ from uuid import uuid4
 import kubernetes
 from kubernetes import watch, client
 from kubernetes.client.rest import ApiException
+from airflow.configuration import conf
 from airflow.contrib.kubernetes.pod_launcher import PodLauncher
 from airflow.contrib.kubernetes.kube_client import get_kube_client
 from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
@@ -87,20 +88,6 @@ class KubeConfig:
     core_section = 'core'
     kubernetes_section = 'kubernetes'
 
-    @staticmethod
-    def safe_get(section, option, default):
-        try:
-            return configuration.get(section, option)
-        except AirflowConfigException:
-            return default
-
-    @staticmethod
-    def safe_getboolean(section, option, default):
-        try:
-            return configuration.getboolean(section, option)
-        except AirflowConfigException:
-            return default
-
     def __init__(self):
         configuration_dict = configuration.as_dict(display_sensitive=True)
         self.core_configuration = configuration_dict['core']
@@ -114,40 +101,37 @@ class KubeConfig:
             self.kubernetes_section, 'worker_container_tag')
         self.kube_image = '{}:{}'.format(
             self.worker_container_repository, self.worker_container_tag)
-        self.delete_worker_pods = self.safe_getboolean(
-            self.kubernetes_section, 'delete_worker_pods', True)
+        self.delete_worker_pods = conf.getboolean(
+            self.kubernetes_section, 'delete_worker_pods')
 
-        self.worker_service_account_name = self.safe_get(
-            self.kubernetes_section, 'worker_service_account_name', 'default')
-        self.image_pull_secrets = self.safe_get(
-            self.kubernetes_section, 'image_pull_secrets', '')
+        self.worker_service_account_name = conf.get(
+            self.kubernetes_section, 'worker_service_account_name')
+        self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets')
 
         # NOTE: `git_repo` and `git_branch` must be specified together as a pair
         # The http URL of the git repository to clone from
-        self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None)
+        self.git_repo = conf.get(self.kubernetes_section, 'git_repo')
         # The branch of the repository to be checked out
-        self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None)
+        self.git_branch = conf.get(self.kubernetes_section, 'git_branch')
         # Optionally, the directory in the git repository containing the dags
-        self.git_subpath = self.safe_get(self.kubernetes_section, 'git_subpath', '')
+        self.git_subpath = conf.get(self.kubernetes_section, 'git_subpath')
 
         # Optionally a user may supply a `git_user` and `git_password` for private
         # repositories
-        self.git_user = self.safe_get(self.kubernetes_section, 'git_user', None)
-        self.git_password = self.safe_get(self.kubernetes_section, 'git_password', None)
+        self.git_user = conf.get(self.kubernetes_section, 'git_user')
+        self.git_password = conf.get(self.kubernetes_section, 'git_password')
 
         # NOTE: The user may optionally use a volume claim to mount a PV containing
         # DAGs directly
-        self.dags_volume_claim = self.safe_get(self.kubernetes_section,
-                                               'dags_volume_claim', None)
+        self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim')
 
         # This prop may optionally be set for PV Claims and is used to write logs
-        self.logs_volume_claim = self.safe_get(
-            self.kubernetes_section, 'logs_volume_claim', None)
+        self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim')
 
         # This prop may optionally be set for PV Claims and is used to locate DAGs
         # on a SubPath
-        self.dags_volume_subpath = self.safe_get(
-            self.kubernetes_section, 'dags_volume_subpath', None)
+        self.dags_volume_subpath = conf.get(
+            self.kubernetes_section, 'dags_volume_subpath')
 
         # This prop may optionally be set for PV Claims and is used to write logs
         self.base_log_folder = configuration.get(self.core_section, 'base_log_folder')
@@ -156,36 +140,32 @@ class KubeConfig:
         # that if your
         # cluster has RBAC enabled, your scheduler may need service account permissions to
         # create, watch, get, and delete pods in this namespace.
-        self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace',
-                                            'default')
+        self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
         # The Kubernetes Namespace in which pods will be created by the executor. Note
         # that if your
         # cluster has RBAC enabled, your workers may need service account permissions to
         # interact with cluster components.
-        self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace',
-                                                'default')
+        self.executor_namespace = conf.get(self.kubernetes_section, 'namespace')
         # Task secrets managed by KubernetesExecutor.
-        self.gcp_service_account_keys = self.safe_get(
-            self.kubernetes_section, 'gcp_service_account_keys', None)
+        self.gcp_service_account_keys = conf.get(self.kubernetes_section,
+                                                 'gcp_service_account_keys')
 
         # If the user is using the git-sync container to clone their repository via git,
         # allow them to specify repository, tag, and pod name for the init container.
-        self.git_sync_container_repository = self.safe_get(
-            self.kubernetes_section, 'git_sync_container_repository',
-            'gcr.io/google-containers/git-sync-amd64')
+        self.git_sync_container_repository = conf.get(
+            self.kubernetes_section, 'git_sync_container_repository')
 
-        self.git_sync_container_tag = self.safe_get(
-            self.kubernetes_section, 'git_sync_container_tag', 'v2.0.5')
+        self.git_sync_container_tag = conf.get(
+            self.kubernetes_section, 'git_sync_container_tag')
         self.git_sync_container = '{}:{}'.format(
             self.git_sync_container_repository, self.git_sync_container_tag)
 
-        self.git_sync_init_container_name = self.safe_get(
-            self.kubernetes_section, 'git_sync_init_container_name', 'git-sync-clone')
+        self.git_sync_init_container_name = conf.get(
+            self.kubernetes_section, 'git_sync_init_container_name')
 
         # The worker pod may optionally have a  valid Airflow config loaded via a
         # configmap
-        self.airflow_configmap = self.safe_get(self.kubernetes_section,
-                                               'airflow_configmap', None)
+        self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')
 
         self._validate()
 
@@ -272,7 +252,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
             self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version))
         elif status == 'Succeeded':
             self.log.info('Event: %s Succeeded', pod_id)
-            self.watcher_queue.put((pod_id, State.SUCCESS, labels, resource_version))
+            self.watcher_queue.put((pod_id, None, labels, resource_version))
         elif status == 'Running':
             self.log.info('Event: %s is Running', pod_id)
         else:
@@ -552,7 +532,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         self.log.debug('Start with worker_uuid: %s', self.worker_uuid)
         # always need to reset resource version since we don't know
         # when we last started, note for behavior below
-        # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod
+        # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs
+        # /CoreV1Api.md#list_namespaced_pod
         KubeResourceVersion.reset_resource_version(self._session)
         self.task_queue = Queue()
         self.result_queue = Queue()
@@ -610,8 +591,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             task_id=task_id,
             execution_date=ex_time
         ).one()
-
-        if item.state == State.RUNNING or item.state == State.QUEUED:
+        if state:
             item.state = state
             self._session.add(item)
             self._session.commit()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index d1a63a2..1d3cc9d 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -14,7 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+from airflow.configuration import conf
 
 def _load_kube_config(in_cluster):
     from kubernetes import config, client
@@ -26,6 +26,6 @@ def _load_kube_config(in_cluster):
         return client.CoreV1Api()
 
 
-def get_kube_client(in_cluster=True):
+def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster')):
     # TODO: This should also allow people to point to a cluster.
     return _load_kube_config(in_cluster)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index cd2cb9f..ac4dacf 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -21,13 +21,18 @@ import six
 
 from airflow.contrib.kubernetes.pod import Pod, Resources
 from airflow.contrib.kubernetes.secret import Secret
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
-class WorkerConfiguration:
+class WorkerConfiguration(LoggingMixin):
     """Contains Kubernetes Airflow Worker configuration logic"""
 
     def __init__(self, kube_config):
         self.kube_config = kube_config
+        self.worker_airflow_home = self.kube_config.airflow_home
+        self.worker_airflow_dags = self.kube_config.dags_folder
+        self.worker_airflow_logs = self.kube_config.base_log_folder
+        super(WorkerConfiguration, self).__init__()
 
     def _get_init_containers(self, volume_mounts):
         """When using git to retrieve the DAGs, use the GitSync Init Container"""
@@ -79,7 +84,7 @@ class WorkerConfiguration:
             'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor'
         }
         if self.kube_config.airflow_configmap:
-            env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
+            env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
         return env
 
     def _get_secrets(self):
@@ -129,19 +134,19 @@ class WorkerConfiguration:
         volume_mounts = [{
             'name': dags_volume_name,
             'mountPath': os.path.join(
-                self.kube_config.dags_folder,
+                self.worker_airflow_dags,
                 self.kube_config.git_subpath
             ),
             'readOnly': True
         }, {
             'name': logs_volume_name,
-            'mountPath': self.kube_config.base_log_folder
+            'mountPath': self.worker_airflow_logs
         }]
 
         # Mount the airflow.cfg file via a configmap the user has specified
         if self.kube_config.airflow_configmap:
             config_volume_name = 'airflow-config'
-            config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
+            config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
             volumes.append({
                 'name': config_volume_name,
                 'configMap': {
@@ -172,6 +177,10 @@ class WorkerConfiguration:
         annotations = {
             'iam.cloud.google.com/service-account': gcp_sa_key
         } if gcp_sa_key else {}
+        airflow_command = airflow_command.replace("-sd", "-i -sd")
+        airflow_path = airflow_command.split('-sd')[-1]
+        airflow_path = self.worker_airflow_home + airflow_path.split('/')[-1]
+        airflow_command = airflow_command.split('-sd')[0] + '-sd ' + airflow_path
 
         return Pod(
             namespace=namespace,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ecbfef8..e5a07b7 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/www_rbac/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py
index 5bc0529..bddf0c1 100644
--- a/airflow/www_rbac/api/experimental/endpoints.py
+++ b/airflow/www_rbac/api/experimental/endpoints.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,8 @@ from airflow.exceptions import AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils import timezone
 from airflow.www_rbac.app import csrf
+from airflow import models
+from airflow.utils.db import create_session
 
 from flask import g, Blueprint, jsonify, request, url_for
 
@@ -112,6 +114,27 @@ def task_info(dag_id, task_id):
     return jsonify(fields)
 
 
+@api_experimental.route('/dags/<string:dag_id>/paused/<string:paused>', methods=['GET'])
+@requires_authentication
+def dag_paused(dag_id, paused):
+    """(Un)pauses a dag"""
+
+    DagModel = models.DagModel
+    with create_session() as session:
+        orm_dag = (
+            session.query(DagModel)
+                   .filter(DagModel.dag_id == dag_id).first()
+        )
+        if paused == 'true':
+            orm_dag.is_paused = True
+        else:
+            orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+    return jsonify({'response': 'ok'})
+
+
 @api_experimental.route(
     '/dags/<string:dag_id>/dag_runs/<string:execution_date>/tasks/<string:task_id>',
     methods=['GET'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile
index 967b698..6d2c62d 100644
--- a/scripts/ci/kubernetes/docker/Dockerfile
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -33,6 +33,9 @@ RUN apt-get update -y && apt-get install -y \
         unzip \
     && apt-get clean
 
+
+RUN pip install --upgrade pip
+
 # Since we install vanilla Airflow, we also want to have support for Postgres and Kubernetes
 RUN pip install -U setuptools && \
     pip install kubernetes && \
@@ -43,6 +46,8 @@ RUN pip install -U setuptools && \
 COPY airflow.tar.gz /tmp/airflow.tar.gz
 RUN pip install /tmp/airflow.tar.gz
 
+COPY airflow-init.sh /tmp/airflow-init.sh
+
 COPY bootstrap.sh /bootstrap.sh
 RUN chmod +x /bootstrap.sh
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/airflow-init.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh
new file mode 100755
index 0000000..dc33625
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/airflow-init.sh
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+#
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.
+
+cd /usr/local/lib/python2.7/dist-packages/airflow && \
+cp -R example_dags/* /root/airflow/dags/ && \
+airflow initdb && \
+alembic upgrade heads && \
+airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/build.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/build.sh b/scripts/ci/kubernetes/docker/build.sh
index 6f14c4d..b93c6b1 100755
--- a/scripts/ci/kubernetes/docker/build.sh
+++ b/scripts/ci/kubernetes/docker/build.sh
@@ -27,7 +27,12 @@ if [ $? -eq 0 ]; then
   eval $ENVCONFIG
 fi
 
-cd $AIRFLOW_ROOT && python setup.py sdist && cp $AIRFLOW_ROOT/dist/*.tar.gz $DIRNAME/airflow.tar.gz && \
-cd $DIRNAME && \
-docker build --pull $DIRNAME --tag=${IMAGE}:${TAG} && \
+echo "Airflow directory $AIRFLOW_ROOT"
+echo "Airflow Docker directory $DIRNAME"
+
+cd $AIRFLOW_ROOT
+python setup.py sdist -q
+echo "Copy distro $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz"
+cp $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz
+cd $DIRNAME && docker build --pull $DIRNAME --tag=${IMAGE}:${TAG}
 rm $DIRNAME/airflow.tar.gz

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/airflow.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml b/scripts/ci/kubernetes/kube/airflow.yaml
index 77566ae..09bbcd8 100644
--- a/scripts/ci/kubernetes/kube/airflow.yaml
+++ b/scripts/ci/kubernetes/kube/airflow.yaml
@@ -61,7 +61,7 @@ spec:
           - "bash"
         args:
           - "-cx"
-          - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade heads"
+          - "./tmp/airflow-init.sh"
       containers:
       - name: webserver
         image: airflow
@@ -88,20 +88,20 @@ spec:
           mountPath: /root/airflow/dags
         - name: airflow-logs
           mountPath: /root/airflow/logs
-        readinessProbe:
-          initialDelaySeconds: 5
-          timeoutSeconds: 5
-          periodSeconds: 5
-          httpGet:
-            path: /admin
-            port: 8080
-        livenessProbe:
-          initialDelaySeconds: 5
-          timeoutSeconds: 5
-          failureThreshold: 5
-          httpGet:
-            path: /admin
-            port: 8080
+#        readinessProbe:
+#          initialDelaySeconds: 5
+#          timeoutSeconds: 5
+#          periodSeconds: 5
+#          httpGet:
+#            path: /login
+#            port: 8080
+#        livenessProbe:
+#          initialDelaySeconds: 5
+#          timeoutSeconds: 5
+#          failureThreshold: 5
+#          httpGet:
+#            path: /login
+#            port: 8080
       - name: scheduler
         image: airflow
         imagePullPolicy: IfNotPresent
@@ -146,76 +146,4 @@ spec:
       nodePort: 30809
   selector:
     name: airflow
----
-apiVersion: v1
-kind: Secret
-metadata:
-  name: airflow-secrets
-type: Opaque
-data:
-  # The sql_alchemy_conn value is a base64 encoded represenation of this connection string:
-  # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
-  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
----
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: airflow-configmap
-data:
-  airflow.cfg: |
-    [core]
-    airflow_home = /root/airflow
-    dags_folder = /root/airflow/dags
-    base_log_folder = /root/airflow/logs
-    logging_level = INFO
-    executor = KubernetesExecutor
-    parallelism = 32
-    plugins_folder = /root/airflow/plugins
-    sql_alchemy_conn = $SQL_ALCHEMY_CONN
-
-    [scheduler]
-    dag_dir_list_interval = 300
-    child_process_log_directory = /root/airflow/logs/scheduler
-    # Task instances listen for external kill signal (when you clear tasks
-    # from the CLI or the UI), this defines the frequency at which they should
-    # listen (in seconds).
-    job_heartbeat_sec = 5
-    max_threads = 16
-
-    # The scheduler constantly tries to trigger new tasks (look at the
-    # scheduler section in the docs for more information). This defines
-    # how often the scheduler should run (in seconds).
-    scheduler_heartbeat_sec = 5
-
-    # after how much time should the scheduler terminate in seconds
-    # -1 indicates to run continuously (see also num_runs)
-    run_duration = -1
-
-    # after how much time a new DAGs should be picked up from the filesystem
-    min_file_process_interval = 0
-
-    statsd_on = False
-    statsd_host = localhost
-    statsd_port = 8125
-    statsd_prefix = airflow
-
-    # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
-    min_file_parsing_loop_time = 1
-
-    print_stats_interval = 30
-    scheduler_zombie_task_threshold = 300
-    max_tis_per_query = 0
-    authenticate = False
-
-    [kubernetes]
-    airflow_configmap = airflow-configmap
-    worker_container_repository = airflow
-    worker_container_tag = latest
-    delete_worker_pods = True
-    git_repo = https://github.com/grantnicholas/testdags.git
-    git_branch = master
-    dags_volume_claim = airflow-dags
-    logs_volume_claim = airflow-logs
 
-    [kubernetes_secrets]
-    SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/configmaps.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml
new file mode 100644
index 0000000..ddba098
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -0,0 +1,359 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: airflow-configmap
+data:
+  airflow.cfg: |
+    [core]
+    airflow_home = /root/airflow
+    dags_folder = /root/airflow/dags
+    base_log_folder = /root/airflow/logs
+    logging_level = INFO
+    executor = KubernetesExecutor
+    parallelism = 32
+    load_examples = True
+    plugins_folder = /root/airflow/plugins
+    sql_alchemy_conn = $SQL_ALCHEMY_CONN
+
+    [scheduler]
+    dag_dir_list_interval = 300
+    child_process_log_directory = /root/airflow/logs/scheduler
+    # Task instances listen for external kill signal (when you clear tasks
+    # from the CLI or the UI), this defines the frequency at which they should
+    # listen (in seconds).
+    job_heartbeat_sec = 5
+    max_threads = 2
+
+    # The scheduler constantly tries to trigger new tasks (look at the
+    # scheduler section in the docs for more information). This defines
+    # how often the scheduler should run (in seconds).
+    scheduler_heartbeat_sec = 5
+
+    # after how much time should the scheduler terminate in seconds
+    # -1 indicates to run continuously (see also num_runs)
+    run_duration = -1
+
+    # after how much time a new DAGs should be picked up from the filesystem
+    min_file_process_interval = 0
+
+    statsd_on = False
+    statsd_host = localhost
+    statsd_port = 8125
+    statsd_prefix = airflow
+
+    # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
+    min_file_parsing_loop_time = 1
+
+    print_stats_interval = 30
+    scheduler_zombie_task_threshold = 300
+    max_tis_per_query = 0
+    authenticate = False
+
+    # Turn off scheduler catchup by setting this to False.
+    # Default behavior is unchanged and
+    # Command Line Backfills still work, but the scheduler
+    # will not do scheduler catchup if this is False,
+    # however it can be set on a per DAG basis in the
+    # DAG definition (catchup)
+    catchup_by_default = True
+
+    [webserver]
+    # The base url of your website as airflow cannot guess what domain or
+    # cname you are using. This is used in automated emails that
+    # airflow sends to point links to the right web server
+    base_url = http://localhost:8080
+
+    # The ip specified when starting the web server
+    web_server_host = 0.0.0.0
+
+    # The port on which to run the web server
+    web_server_port = 8080
+
+    # Paths to the SSL certificate and key for the web server. When both are
+    # provided SSL will be enabled. This does not change the web server port.
+    web_server_ssl_cert =
+    web_server_ssl_key =
+
+    # Number of seconds the webserver waits before killing gunicorn master that doesn't respond
+    web_server_master_timeout = 120
+
+    # Number of seconds the gunicorn webserver waits before timing out on a worker
+    web_server_worker_timeout = 120
+
+    # Number of workers to refresh at a time. When set to 0, worker refresh is
+    # disabled. When nonzero, airflow periodically refreshes webserver workers by
+    # bringing up new ones and killing old ones.
+    worker_refresh_batch_size = 1
+
+    # Number of seconds to wait before refreshing a batch of workers.
+    worker_refresh_interval = 30
+
+    # Secret key used to run your flask app
+    secret_key = temporary_key
+
+    # Number of workers to run the Gunicorn web server
+    workers = 4
+
+    # The worker class gunicorn should use. Choices include
+    # sync (default), eventlet, gevent
+    worker_class = sync
+
+    # Log files for the gunicorn webserver. '-' means log to stderr.
+    access_logfile = -
+    error_logfile = -
+
+    # Expose the configuration file in the web server
+    expose_config = False
+
+    # Set to true to turn on authentication:
+    # https://airflow.incubator.apache.org/security.html#web-authentication
+    authenticate = False
+
+    # Filter the list of dags by owner name (requires authentication to be enabled)
+    filter_by_owner = False
+
+    # Filtering mode. Choices include user (default) and ldapgroup.
+    # Ldap group filtering requires using the ldap backend
+    #
+    # Note that the ldap server needs the "memberOf" overlay to be set up
+    # in order to user the ldapgroup mode.
+    owner_mode = user
+
+    # Default DAG view.  Valid values are:
+    # tree, graph, duration, gantt, landing_times
+    dag_default_view = tree
+
+    # Default DAG orientation. Valid values are:
+    # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
+    dag_orientation = LR
+
+    # Puts the webserver in demonstration mode; blurs the names of Operators for
+    # privacy.
+    demo_mode = False
+
+    # The amount of time (in secs) webserver will wait for initial handshake
+    # while fetching logs from other worker machine
+    log_fetch_timeout_sec = 5
+
+    # By default, the webserver shows paused DAGs. Flip this to hide paused
+    # DAGs by default
+    hide_paused_dags_by_default = False
+
+    # Consistent page size across all listing views in the UI
+    page_size = 100
+
+    # Use FAB-based webserver with RBAC feature
+    rbac = True
+
+    [smtp]
+    # If you want airflow to send emails on retries, failure, and you want to use
+    # the airflow.utils.email.send_email_smtp function, you have to configure an
+    # smtp server here
+    smtp_host = localhost
+    smtp_starttls = True
+    smtp_ssl = False
+    # Uncomment and set the user/pass settings if you want to use SMTP AUTH
+    # smtp_user = airflow
+    # smtp_password = airflow
+    smtp_port = 25
+    smtp_mail_from = airflow@example.com
+
+    [kubernetes]
+    airflow_configmap = airflow-configmap
+    worker_container_repository = airflow
+    worker_container_tag = latest
+    delete_worker_pods = True
+    git_repo = https://github.com/apache/incubator-airflow.git
+    git_branch = master
+    git_subpath = airflow/example_dags/
+    git_user =
+    git_password =
+    dags_volume_claim = airflow-dags
+    logs_volume_claim = airflow-logs
+    in_cluster = True
+    namespace = default
+    gcp_service_account_keys =
+
+    # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
+    git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
+    git_sync_container_tag = v2.0.5
+    git_sync_init_container_name = git-sync-clone
+
+    [kubernetes_secrets]
+    SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
+
+    [hive]
+    # Default mapreduce queue for HiveOperator tasks
+    default_hive_mapred_queue =
+
+    [celery]
+    # This section only applies if you are using the CeleryExecutor in
+    # [core] section above
+
+    # The app name that will be used by celery
+    celery_app_name = airflow.executors.celery_executor
+
+    # The concurrency that will be used when starting workers with the
+    # "airflow worker" command. This defines the number of task instances that
+    # a worker will take, so size up your workers based on the resources on
+    # your worker box and the nature of your tasks
+    worker_concurrency = 16
+
+    # When you start an airflow worker, airflow starts a tiny web server
+    # subprocess to serve the workers local log files to the airflow main
+    # web server, who then builds pages and sends them to users. This defines
+    # the port on which the logs are served. It needs to be unused, and open
+    # visible from the main web server to connect into the workers.
+    worker_log_server_port = 8793
+
+    # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
+    # a sqlalchemy database. Refer to the Celery documentation for more
+    # information.
+    # http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
+    broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
+
+    # The Celery result_backend. When a job finishes, it needs to update the
+    # metadata of the job. Therefore it will post a message on a message bus,
+    # or insert it into a database (depending of the backend)
+    # This status is used by the scheduler to update the state of the task
+    # The use of a database is highly recommended
+    # http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
+    result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
+
+    # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
+    # it `airflow flower`. This defines the IP that Celery Flower runs on
+    flower_host = 0.0.0.0
+
+    # The root URL for Flower
+    # Ex: flower_url_prefix = /flower
+    flower_url_prefix =
+
+    # This defines the port that Celery Flower runs on
+    flower_port = 5555
+
+    # Default queue that tasks get assigned to and that worker listen on.
+    default_queue = default
+
+    # Import path for celery configuration options
+    celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
+
+    [celery_broker_transport_options]
+    # The visibility timeout defines the number of seconds to wait for the worker
+    # to acknowledge the task before the message is redelivered to another worker.
+    # Make sure to increase the visibility timeout to match the time of the longest
+    # ETA you're planning to use. Especially important in case of using Redis or SQS
+    visibility_timeout = 21600
+
+    # In case of using SSL
+    ssl_active = False
+    ssl_key =
+    ssl_cert =
+    ssl_cacert =
+
+    [dask]
+    # This section only applies if you are using the DaskExecutor in
+    # [core] section above
+
+    # The IP address and port of the Dask cluster's scheduler.
+    cluster_address = 127.0.0.1:8786
+    # TLS/ SSL settings to access a secured Dask scheduler.
+    tls_ca =
+    tls_cert =
+    tls_key =
+
+    [ldap]
+    # set this to ldaps://<your.ldap.server>:<port>
+    uri =
+    user_filter = objectClass=*
+    user_name_attr = uid
+    group_member_attr = memberOf
+    superuser_filter =
+    data_profiler_filter =
+    bind_user = cn=Manager,dc=example,dc=com
+    bind_password = insecure
+    basedn = dc=example,dc=com
+    cacert = /etc/ca/ldap_ca.crt
+    search_scope = LEVEL
+
+    [mesos]
+    # Mesos master address which MesosExecutor will connect to.
+    master = localhost:5050
+
+    # The framework name which Airflow scheduler will register itself as on mesos
+    framework_name = Airflow
+
+    # Number of cpu cores required for running one task instance using
+    # 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+    # command on a mesos slave
+    task_cpu = 1
+
+    # Memory in MB required for running one task instance using
+    # 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+    # command on a mesos slave
+    task_memory = 256
+
+    # Enable framework checkpointing for mesos
+    # See http://mesos.apache.org/documentation/latest/slave-recovery/
+    checkpoint = False
+
+    # Failover timeout in milliseconds.
+    # When checkpointing is enabled and this option is set, Mesos waits
+    # until the configured timeout for
+    # the MesosExecutor framework to re-register after a failover. Mesos
+    # shuts down running tasks if the
+    # MesosExecutor framework fails to re-register within this timeframe.
+    # failover_timeout = 604800
+
+    # Enable framework authentication for mesos
+    # See http://mesos.apache.org/documentation/latest/configuration/
+    authenticate = False
+
+    # Mesos credentials, if authentication is enabled
+    # default_principal = admin
+    # default_secret = admin
+
+    # Optional Docker Image to run on slave before running the command
+    # This image should be accessible from mesos slave i.e mesos slave
+    # should be able to pull this docker image before executing the command.
+    # docker_image_slave = puckel/docker-airflow
+
+    [kerberos]
+    ccache = /tmp/airflow_krb5_ccache
+    # gets augmented with fqdn
+    principal = airflow
+    reinit_frequency = 3600
+    kinit_path = kinit
+    keytab = airflow.keytab
+
+    [cli]
+    api_client = airflow.api.client.json_client
+    endpoint_url = http://localhost:8080
+
+    [api]
+    auth_backend = airflow.api.auth.backend.default
+
+    [github_enterprise]
+    api_rev = v3
+
+    [admin]
+    # UI to hide sensitive variable fields when set to True
+    hide_sensitive_variable_fields = True
+
+    [elasticsearch]
+    elasticsearch_host =

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh
index a5adcf8..e585d87 100755
--- a/scripts/ci/kubernetes/kube/deploy.sh
+++ b/scripts/ci/kubernetes/kube/deploy.sh
@@ -21,8 +21,14 @@ IMAGE=${1:-airflow/ci}
 TAG=${2:-latest}
 DIRNAME=$(cd "$(dirname "$0")"; pwd)
 
+kubectl delete -f $DIRNAME/postgres.yaml
+kubectl delete -f $DIRNAME/airflow.yaml
+kubectl delete -f $DIRNAME/secrets.yaml
+
 kubectl apply -f $DIRNAME/postgres.yaml
 kubectl apply -f $DIRNAME/volumes.yaml
+kubectl apply -f $DIRNAME/secrets.yaml
+kubectl apply -f $DIRNAME/configmaps.yaml
 kubectl apply -f $DIRNAME/airflow.yaml
 
 # wait for up to 10 minutes for everything to be deployed

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/postgres.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/postgres.yaml b/scripts/ci/kubernetes/kube/postgres.yaml
index 67a0635..1130921 100644
--- a/scripts/ci/kubernetes/kube/postgres.yaml
+++ b/scripts/ci/kubernetes/kube/postgres.yaml
@@ -30,6 +30,7 @@ spec:
       containers:
         - name: postgres
           image: postgres
+          imagePullPolicy: IfNotPresent
           ports:
             - containerPort: 5432
               protocol: TCP

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/secrets.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/secrets.yaml b/scripts/ci/kubernetes/kube/secrets.yaml
new file mode 100644
index 0000000..4d533b3
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/secrets.yaml
@@ -0,0 +1,25 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+apiVersion: v1
+kind: Secret
+metadata:
+  name: airflow-secrets
+type: Opaque
+data:
+  # The sql_alchemy_conn value is a base64 encoded represenation of this connection string:
+  # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/volumes.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/volumes.yaml b/scripts/ci/kubernetes/kube/volumes.yaml
index 073e98c..58ad368 100644
--- a/scripts/ci/kubernetes/kube/volumes.yaml
+++ b/scripts/ci/kubernetes/kube/volumes.yaml
@@ -62,3 +62,4 @@ spec:
   resources:
     requests:
       storage: 2Gi
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index 8766e94..52571cc 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -26,7 +26,7 @@ then
   tox -e $TOX_ENV
 else
   KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
-  tox -e $TOX_ENV -- tests.contrib.minikube_tests \
+  tox -e $TOX_ENV -- tests.contrib.minikube \
                      --with-coverage \
                      --cover-erase \
                      --cover-html \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/setup.cfg
----------------------------------------------------------------------
diff --git a/setup.cfg b/setup.cfg
index 7b3479c..622cc13 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -6,16 +5,15 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 [metadata]
 name = Airflow
 summary = Airflow is a system to programmatically author, schedule and monitor data pipelines.
@@ -29,8 +27,11 @@ packages = airflow
 
 [build_sphinx]
 source-dir = docs/
-build-dir  = docs/_build
-all_files  = 1
+build-dir = docs/_build
+all_files = 1
 
 [upload_sphinx]
 upload-dir = docs/_build/html
+
+[easy_install]
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/cli/test_cli.py
----------------------------------------------------------------------
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 9c79567..34c82bc 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,15 +22,84 @@ import unittest
 
 from mock import patch, Mock, MagicMock
 from time import sleep
-
 import psutil
-
+from argparse import Namespace
 from airflow import settings
-from airflow.bin.cli import get_num_ready_workers_running
+from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
+from airflow.models import TaskInstance
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.settings import Session
+from airflow import models
+
+import os
+
+dag_folder_path = '/'.join(os.path.realpath(__file__).split('/')[:-1])
+
+TEST_DAG_FOLDER = os.path.join(
+    os.path.dirname(dag_folder_path), 'dags')
+TEST_DAG_ID = 'unit_tests'
+
+
+def reset(dag_id):
+    session = Session()
+    tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+    tis.delete()
+    session.commit()
+    session.close()
+
+
+def create_mock_args(
+    task_id,
+    dag_id,
+    subdir,
+    execution_date,
+    task_params=None,
+    dry_run=False,
+    queue=None,
+    pool=None,
+    priority_weight_total=None,
+    retries=0,
+    local=True,
+    mark_success=False,
+    ignore_all_dependencies=False,
+    ignore_depends_on_past=False,
+    ignore_dependencies=False,
+    force=False,
+    run_as_user=None,
+    executor_config={},
+    cfg_path=None,
+    pickle=None,
+    raw=None,
+    interactive=None,
+):
+    args = MagicMock(spec=Namespace)
+    args.task_id = task_id
+    args.dag_id = dag_id
+    args.subdir = subdir
+    args.task_params = task_params
+    args.execution_date = execution_date
+    args.dry_run = dry_run
+    args.queue = queue
+    args.pool = pool
+    args.priority_weight_total = priority_weight_total
+    args.retries = retries
+    args.local = local
+    args.run_as_user = run_as_user
+    args.executor_config = executor_config
+    args.cfg_path = cfg_path
+    args.pickle = pickle
+    args.raw = raw
+    args.mark_success = mark_success
+    args.ignore_all_dependencies = ignore_all_dependencies
+    args.ignore_depends_on_past = ignore_depends_on_past
+    args.ignore_dependencies = ignore_dependencies
+    args.force = force
+    args.interactive = interactive
+    return args
 
 
 class TestCLI(unittest.TestCase):
-
     def setUp(self):
         self.gunicorn_master_proc = Mock(pid=None)
         self.children = MagicMock()
@@ -74,3 +143,23 @@ class TestCLI(unittest.TestCase):
             "webserver terminated with return code {} in debug mode".format(return_code))
         p.terminate()
         p.wait()
+
+    def test_local_run(self):
+        args = create_mock_args(
+            task_id='print_the_context',
+            dag_id='example_python_operator',
+            subdir='/root/dags/example_python_operator.py',
+            interactive=True,
+            execution_date=timezone.parse('2018-04-27T08:39:51.298439+00:00')
+        )
+
+        reset(args.dag_id)
+
+        with patch('argparse.Namespace', args) as mock_args:
+            run(mock_args)
+            dag = get_dag(mock_args)
+            task = dag.get_task(task_id=args.task_id)
+            ti = TaskInstance(task, args.execution_date)
+            ti.refresh_from_db()
+            state = ti.current_state()
+            self.assertEqual(state, State.SUCCESS)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/kubernetes/__init__.py b/tests/contrib/kubernetes/__init__.py
deleted file mode 100644
index 759b563..0000000
--- a/tests/contrib/kubernetes/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/__init__.py b/tests/contrib/minikube/__init__.py
new file mode 100644
index 0000000..114d189
--- /dev/null
+++ b/tests/contrib/minikube/__init__.py
@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_executor.py b/tests/contrib/minikube/test_kubernetes_executor.py
new file mode 100644
index 0000000..9827bc8
--- /dev/null
+++ b/tests/contrib/minikube/test_kubernetes_executor.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import unittest
+from subprocess import check_call, check_output
+
+import requests
+import time
+import six
+
+try:
+    check_call(["kubectl", "get", "pods"])
+except Exception as e:
+    raise unittest.SkipTest(
+        "Kubernetes integration tests require a minikube cluster;"
+        "Skipping tests {}".format(e)
+    )
+
+
+class KubernetesExecutorTest(unittest.TestCase):
+
+    def test_integration_run_dag(self):
+        host_ip = check_output(['minikube', 'ip'])
+        if six.PY3:
+            host_ip = host_ip.decode('UTF-8')
+        host = '{}:30809'.format(host_ip.strip())
+
+        #  Enable the dag
+        result = requests.get(
+            'http://{}/api/experimental/'
+            'dags/example_python_operator/paused/false'.format(host)
+        )
+        self.assertEqual(result.status_code, 200, "Could not enable DAG")
+
+        # Trigger a new dagrun
+        result = requests.post(
+            'http://{}/api/experimental/'
+            'dags/example_python_operator/dag_runs'.format(host),
+            json={}
+        )
+        self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run")
+
+        time.sleep(1)
+
+        result = requests.get(
+            'http://{}/api/experimental/latest_runs'.format(host)
+        )
+        self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run")
+        result_json = result.json()
+
+        self.assertGreater(len(result_json['items']), 0)
+
+        execution_date = result_json['items'][0]['execution_date']
+        print("Found the job with execution date {}".format(execution_date))
+
+        tries = 0
+        state = ''
+        # Wait 100 seconds for the operator to complete
+        while tries < 20:
+            time.sleep(5)
+
+            # Trigger a new dagrun
+            result = requests.get(
+                'http://{}/api/experimental/dags/example_python_operator/'
+                'dag_runs/{}/tasks/print_the_context'.format(host, execution_date)
+            )
+            self.assertEqual(result.status_code, 200, "Could not get the status")
+            result_json = result.json()
+            state = result_json['state']
+            print("Attempt {}: Current state of operator is {}".format(tries, state))
+
+            if state == 'success':
+                break
+            tries += 1
+
+        self.assertEqual(state, 'success')
+
+        # Maybe check if we can retrieve the logs, but then we need to extend the API
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..081fc04
--- /dev/null
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+from subprocess import check_call
+import mock
+from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+
+try:
+    check_call(["kubectl", "get", "pods"])
+except Exception as e:
+    raise unittest.SkipTest(
+        "Kubernetes integration tests require a minikube cluster;"
+        "Skipping tests {}".format(e)
+    )
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+    def test_working_pod(self):
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo", "10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task"
+        )
+        k.execute(None)
+
+    def test_logging(self):
+        with mock.patch.object(PodLauncher, 'log') as mock_logger:
+            k = KubernetesPodOperator(
+                namespace='default',
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["echo", "10"],
+                labels={"foo": "bar"},
+                name="test",
+                task_id="task",
+                get_logs=True
+            )
+            k.execute(None)
+            mock_logger.info.assert_any_call(b"+ echo\n")
+
+    def test_faulty_image(self):
+        bad_image_name = "foobar"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image=bad_image_name,
+            cmds=["bash", "-cx"],
+            arguments=["echo", "10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            startup_timeout_seconds=5
+        )
+        with self.assertRaises(AirflowException) as cm:
+            k.execute(None),
+
+        print("exception: {}".format(cm))
+
+    def test_pod_failure(self):
+        """
+            Tests that the task fails when a pod reports a failure
+        """
+        bad_internal_command = "foobar"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=[bad_internal_command, "10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task"
+        )
+        with self.assertRaises(AirflowException):
+            k.execute(None)
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
deleted file mode 100644
index 4067cc7..0000000
--- a/tests/contrib/minikube_tests/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/__init__.py b/tests/contrib/minikube_tests/integration/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/tests/contrib/minikube_tests/integration/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/airflow_controller.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/airflow_controller.py b/tests/contrib/minikube_tests/integration/airflow_controller.py
deleted file mode 100644
index 5604652..0000000
--- a/tests/contrib/minikube_tests/integration/airflow_controller.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import subprocess
-import time
-
-
-class RunCommandError(Exception):
-    pass
-
-
-class TimeoutError(Exception):
-    pass
-
-
-class DagRunState:
-    SUCCESS = "success"
-    FAILED = "failed"
-    RUNNING = "running"
-
-
-def run_command(command):
-    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
-                               stderr=subprocess.PIPE)
-    stdout, stderr = process.communicate()
-    if process.returncode != 0:
-        raise RunCommandError(
-            "Error while running command: {}; Stdout: {}; Stderr: {}".format(
-                command, stdout, stderr
-            ))
-    return stdout, stderr
-
-
-def run_command_in_pod(pod_name, container_name, command):
-    return run_command("kubectl exec {pod_name} -c {container_name} -- {command}".format(
-        pod_name=pod_name, container_name=container_name, command=command
-    ))
-
-
-def get_scheduler_logs(airflow_pod=None):
-    airflow_pod = airflow_pod or _get_airflow_pod()
-
-    return run_command("kubectl logs {pod_name} scheduler"
-                       .format(pod_name=airflow_pod))
-
-
-def _unpause_dag(dag_id, airflow_pod=None):
-    airflow_pod = airflow_pod or _get_airflow_pod()
-    return run_command_in_pod(airflow_pod, "scheduler",
-                              "airflow unpause {dag_id}".format(dag_id=dag_id))
-
-
-def run_dag(dag_id, run_id, airflow_pod=None):
-    airflow_pod = airflow_pod or _get_airflow_pod()
-    _unpause_dag(dag_id, airflow_pod)
-    return run_command_in_pod(airflow_pod, "scheduler",
-                              "airflow trigger_dag {dag_id} -r {run_id}".format(
-                                  dag_id=dag_id, run_id=run_id
-                              ))
-
-
-def _get_pod_by_grep(grep_phrase):
-    stdout, stderr = run_command(
-        "kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format(
-            grep_phrase=grep_phrase
-        ))
-    pod_name = stdout.strip()
-    return pod_name
-
-
-def _get_airflow_pod():
-    return _get_pod_by_grep("^airflow")
-
-
-def _get_postgres_pod():
-    return _get_pod_by_grep("^postgres")
-
-
-def _parse_state(stdout):
-    end_line = "(1 row)"
-    prev_line = None
-    for line in stdout.split("\n"):
-        if end_line in line:
-            return prev_line.strip()
-        prev_line = line
-
-    raise Exception("Unknown psql output: {}".format(stdout))
-
-
-def get_dag_run_table(postgres_pod=None):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    stdout, stderr = run_command_in_pod(
-        postgres_pod, "postgres",
-        """psql airflow -c "select * from dag_run" """
-    )
-    return stdout
-
-
-def get_task_instance_table(postgres_pod=None):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    stdout, stderr = run_command_in_pod(
-        postgres_pod, "postgres",
-        """psql airflow -c "select * from task_instance" """
-    )
-    return stdout
-
-
-def get_dag_run_state(dag_id, run_id, postgres_pod=None):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    stdout, stderr = run_command_in_pod(
-        postgres_pod, "postgres",
-        """psql airflow -c "select state from dag_run where dag_id='{dag_id}' and
-         run_id='{run_id}'" """.format(
-            dag_id=dag_id, run_id=run_id
-        )
-    )
-    return _parse_state(stdout)
-
-
-def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, timeout=120):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    for _ in range(0, timeout / poll_interval):
-        dag_state = get_dag_run_state(dag_id, run_id, postgres_pod)
-        if dag_state != DagRunState.RUNNING:
-            capture_logs_for_failure(dag_state)
-            return dag_state
-        time.sleep(poll_interval)
-
-    raise TimeoutError(
-        "Timed out while waiting for DagRun with dag_id: {} run_id: {}".format(dag_id,
-                                                                               run_id))
-
-
-def _kill_pod(pod_name):
-    return run_command("kubectl delete pod {pod_name}".format(pod_name=pod_name))
-
-
-def kill_scheduler():
-    airflow_pod = _get_pod_by_grep("^airflow")
-    return _kill_pod(airflow_pod)
-
-
-def capture_logs_for_failure(state):
-    if state != DagRunState.SUCCESS:
-        stdout, stderr = get_scheduler_logs()
-        print("stdout:")
-        for line in stdout.split('\n'):
-            print(line)
-        print("stderr:")
-        for line in stderr.split('\n'):
-            print(line)
-        print("dag_run:")
-        print(get_dag_run_table())
-        print("task_instance")
-        print(get_task_instance_table())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py b/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
deleted file mode 100644
index 602a717..0000000
--- a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-import unittest
-from uuid import uuid4
-
-from tests.contrib.minikube_tests.integration.airflow_controller\
-    import DagRunState, RunCommandError, \
-    dag_final_state, get_dag_run_state, kill_scheduler, run_command, run_dag
-
-try:
-    run_command("kubectl get pods")
-except RunCommandError:
-    SKIP_KUBE = True
-else:
-    SKIP_KUBE = False
-
-
-class KubernetesExecutorTest(unittest.TestCase):
-    @unittest.skipIf(SKIP_KUBE,
-                     'Kubernetes integration tests are unsupported by this configuration')
-    def test_kubernetes_executor_dag_runs_successfully(self):
-        dag_id, run_id = "example_python_operator", uuid4().hex
-        run_dag(dag_id, run_id)
-        state = dag_final_state(dag_id, run_id, timeout=120)
-        self.assertEquals(state, DagRunState.SUCCESS)
-
-    @unittest.skipIf(SKIP_KUBE,
-                     'Kubernetes integration tests are unsupported by this configuration')
-    def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self):
-        dag_id, run_id = "example_python_operator", uuid4().hex
-        run_dag(dag_id, run_id)
-
-        self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
-
-        time.sleep(10)
-
-        kill_scheduler()
-
-        self.assertEquals(dag_final_state(dag_id, run_id, timeout=180),
-                          DagRunState.SUCCESS)
-
-    @unittest.skipIf(SKIP_KUBE,
-                     'Kubernetes integration tests are unsupported by this configuration')
-    def test_kubernetes_executor_config_works(self):
-        dag_id, run_id = "example_kubernetes_executor", uuid4().hex
-        run_dag(dag_id, run_id)
-
-        self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
-        self.assertEquals(dag_final_state(dag_id, run_id, timeout=300),
-                          DagRunState.SUCCESS)
-
-
-if __name__ == "__main__":
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
deleted file mode 100644
index 321f01f..0000000
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-from subprocess import check_call
-import mock
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
-
-try:
-    check_call(["kubectl", "get", "pods"])
-except Exception as e:
-    raise unittest.SkipTest(
-        "Kubernetes integration tests require a minikube cluster;"
-        "Skipping tests {}".format(e)
-    )
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
-    def test_working_pod(self):
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        k.execute(None)
-
-    def test_logging(self):
-        with mock.patch.object(PodLauncher, 'log') as mock_logger:
-            k = KubernetesPodOperator(namespace='default',
-                                      image="ubuntu:16.04",
-                                      cmds=["bash", "-cx"],
-                                      arguments=["echo", "10"],
-                                      labels={"foo": "bar"},
-                                      name="test",
-                                      task_id="task",
-                                      get_logs=True
-                                      )
-            k.execute(None)
-            mock_logger.info.assert_any_call("+ echo\n")
-
-    def test_faulty_image(self):
-        bad_image_name = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image=bad_image_name,
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task",
-                                  startup_timeout_seconds=5
-                                  )
-        with self.assertRaises(AirflowException) as cm:
-            k.execute(None),
-
-        print("exception: {}".format(cm))
-
-    def test_pod_failure(self):
-        """
-            Tests that the task fails when a pod reports a failure
-        """
-
-        bad_internal_command = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=[bad_internal_command, "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        with self.assertRaises(AirflowException):
-            k.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/www_rbac/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/api/experimental/test_endpoints.py b/tests/www_rbac/api/experimental/test_endpoints.py
index 7bcbb8e..a19492e 100644
--- a/tests/www_rbac/api/experimental/test_endpoints.py
+++ b/tests/www_rbac/api/experimental/test_endpoints.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -77,6 +77,23 @@ class TestApiExperimental(unittest.TestCase):
         self.assertIn('error', response.data.decode('utf-8'))
         self.assertEqual(404, response.status_code)
 
+    def test_task_paused(self):
+        url_template = '/api/experimental/dags/{}/paused/{}'
+
+        response = self.app.get(
+            url_template.format('example_bash_operator', 'true')
+        )
+        self.assertIn('ok', response.data.decode('utf-8'))
+        self.assertEqual(200, response.status_code)
+
+        url_template = '/api/experimental/dags/{}/paused/{}'
+
+        response = self.app.get(
+            url_template.format('example_bash_operator', 'false')
+        )
+        self.assertIn('ok', response.data.decode('utf-8'))
+        self.assertEqual(200, response.status_code)
+
     def test_trigger_dag(self):
         url_template = '/api/experimental/dags/{}/dag_runs'
         response = self.app.post(


[21/50] incubator-airflow git commit: closes apache/incubator-airflow#2539 *Closed for inactivity*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2539 *Closed for inactivity*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8c8d1407
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8c8d1407
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8c8d1407

Branch: refs/heads/v1-10-test
Commit: 8c8d1407f8fb374f1f6d8f7788f880461519a430
Parents: ad28dec
Author: r39132 <si...@yahoo.com>
Authored: Sun Apr 29 20:16:51 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 20:16:51 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[31/50] incubator-airflow git commit: closes apache/incubator-airflow#2814 *Messed up PR - hundreds of old commits.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2814 *Messed up PR - hundreds of old commits.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8921e7d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8921e7d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8921e7d7

Branch: refs/heads/v1-10-test
Commit: 8921e7d70c46027c5a3f4b686f70d893ec91eb89
Parents: f63a2b1
Author: r39132 <si...@yahoo.com>
Authored: Mon Apr 30 13:58:40 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 13:58:40 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[42/50] incubator-airflow git commit: [AIRFLOW-XXX] Fix wrong table header in scheduler.rst

Posted by fo...@apache.org.
[AIRFLOW-XXX] Fix wrong table header in scheduler.rst

Closes #3306 from sekikn/table_header


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6f688464
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6f688464
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6f688464

Branch: refs/heads/v1-10-test
Commit: 6f6884641f6eca799f35190ef133329a00a1daaf
Parents: 2a079b9
Author: Kengo Seki <se...@apache.org>
Authored: Wed May 2 23:47:33 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Wed May 2 23:47:33 2018 -0700

----------------------------------------------------------------------
 docs/scheduler.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6f688464/docs/scheduler.rst
----------------------------------------------------------------------
diff --git a/docs/scheduler.rst b/docs/scheduler.rst
index 7e4e544..dfa0a42 100644
--- a/docs/scheduler.rst
+++ b/docs/scheduler.rst
@@ -45,7 +45,7 @@ a ``str``, or a ``datetime.timedelta`` object. Alternatively, you can also
 use one of these cron "preset":
 
 +--------------+----------------------------------------------------------------+---------------+
-| preset       | Run once a year at midnight of January 1                       | cron          |
+| preset       | meaning                                                        | cron          |
 +==============+================================================================+===============+
 | ``None``     | Don't schedule, use for exclusively "externally triggered"     |               |
 |              | DAGs                                                           |               |


[05/50] incubator-airflow git commit: closes apache/incubator-airflow#2586 *Closed for inactivity*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2586 *Closed for inactivity*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/172b73c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/172b73c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/172b73c1

Branch: refs/heads/v1-10-test
Commit: 172b73c155f555c49c340275793c0c0a0b09bc24
Parents: b922521
Author: r39132 <si...@yahoo.com>
Authored: Fri Apr 27 09:42:19 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Fri Apr 27 09:42:19 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[43/50] incubator-airflow git commit: [AIRFLOW-XXX] Add Reddit to Airflow users

Posted by fo...@apache.org.
[AIRFLOW-XXX] Add Reddit to Airflow users

Closes #3309 from seato/reddit_airflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6b3f6cee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6b3f6cee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6b3f6cee

Branch: refs/heads/v1-10-test
Commit: 6b3f6cee118affee60b4b9319d6cf0d91c732bc4
Parents: 6f68846
Author: Richard Thai <ri...@gmail.com>
Authored: Thu May 3 21:23:09 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Thu May 3 21:23:09 2018 +0200

----------------------------------------------------------------------
 README.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b3f6cee/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index bdbfbf2..07371ab 100644
--- a/README.md
+++ b/README.md
@@ -203,6 +203,7 @@ Currently **officially** using Airflow:
 1. [Qubole](https://qubole.com) [[@msumit](https://github.com/msumit)]
 1. [Quizlet](https://quizlet.com) [[@quizlet](https://github.com/quizlet)]
 1. [Quora](https://www.quora.com/)
+1. [Reddit](https://www.reddit.com/) [[@reddit](https://github.com/reddit/)]
 1. [Robinhood](https://robinhood.com) [[@vineet-rh](https://github.com/vineet-rh)]
 1. [Scaleway](https://scaleway.com) [[@kdeldycke](https://github.com/kdeldycke)]
 1. [Sense360](https://github.com/Sense360) [[@kamilmroczek](https://github.com/KamilMroczek)]


[10/50] incubator-airflow git commit: [AIRFLOW-2266][AIRFLOW-2343] Remove google-cloud-dataflow dependency

Posted by fo...@apache.org.
[AIRFLOW-2266][AIRFLOW-2343] Remove google-cloud-dataflow dependency

This is caused due to the fact that the latest
release (2.4) for apache-beam[gcp] is not
available for Python 3.x. Also as we are using
Google's discovery based API for all google cloud
related commands we don't require to import
google-cloud-dataflow package

Closes #3273 from kaxil/patch-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e9b74b68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e9b74b68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e9b74b68

Branch: refs/heads/v1-10-test
Commit: e9b74b68aa848b6ac5602f3e435cba38efde32d5
Parents: 5aa1586
Author: Kaxil Naik <ka...@gmail.com>
Authored: Sat Apr 28 21:24:15 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sat Apr 28 21:24:15 2018 +0200

----------------------------------------------------------------------
 setup.py | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9b74b68/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index b5461eb..fef8d8a 100644
--- a/setup.py
+++ b/setup.py
@@ -133,7 +133,6 @@ gcp_api = [
     'google-api-python-client>=1.5.0, <1.6.0',
     'oauth2client>=2.0.2, <2.1.0',
     'PyOpenSSL',
-    'google-cloud-dataflow>=2.2.0',
     'pandas-gbq'
 ]
 hdfs = ['snakebite>=2.7.8']
@@ -206,8 +205,7 @@ devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + or
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
     devel_ci = [package for package in devel_all if package not in
-                ['snakebite>=2.7.8', 'snakebite[kerberos]>=2.7.8',
-                 'google-cloud-dataflow>=2.2.0']]
+                ['snakebite>=2.7.8', 'snakebite[kerberos]>=2.7.8']]
 else:
     devel_ci = devel_all
 


[28/50] incubator-airflow git commit: [AIRFLOW-2398] Add BounceX to list of current airflow users

Posted by fo...@apache.org.
[AIRFLOW-2398] Add BounceX to list of current airflow users

Closes #3282 from JoshFerge/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9a8e4f77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9a8e4f77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9a8e4f77

Branch: refs/heads/v1-10-test
Commit: 9a8e4f77568783b95efb35b483e1eac10f021ea9
Parents: 19b3901
Author: Josh Ferge <jo...@bounceexchange.com>
Authored: Mon Apr 30 07:03:33 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 07:03:37 2018 -0700

----------------------------------------------------------------------
 README.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9a8e4f77/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index b504cbd..bd09e9c 100644
--- a/README.md
+++ b/README.md
@@ -105,6 +105,7 @@ Currently **officially** using Airflow:
 1. [Boda Telecom Suite - CE](https://github.com/bodastage/bts-ce) [[@erssebaggala](https://github.com/erssebaggala), [@bodastage](https://github.com/bodastage)]
 1. [Bodastage Solutions](http://bodastage.com) [[@erssebaggala](https://github.com/erssebaggala), [@bodastage](https://github.com/bodastage)]
 1. [Bonnier Broadcasting](http://www.bonnierbroadcasting.com) [[@wileeam](https://github.com/wileeam)]
+1. [BounceX](http://www.bouncex.com) [[@JoshFerge](https://github.com/JoshFerge), [@hudsonrio](https://github.com/hudsonrio), [@ronniekritou](https://github.com/ronniekritou)]
 1. [California Data Collaborative](https://github.com/California-Data-Collaborative) powered by [ARGO Labs](http://www.argolabs.org)
 1. [Carbonite](https://www.carbonite.com) [[@ajbosco](https://github.com/ajbosco)]
 1. [Celect](http://www.celect.com) [[@superdosh](https://github.com/superdosh) & [@chadcelect](https://github.com/chadcelect)]


[19/50] incubator-airflow git commit: closes apache/incubator-airflow#2930 *Fix belongs in SQLAlchemy.*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2930 *Fix belongs in SQLAlchemy.*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3952e050
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3952e050
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3952e050

Branch: refs/heads/v1-10-test
Commit: 3952e050a1fe6191f031c2af92785b1823e8f3e3
Parents: 72f15a1
Author: r39132 <si...@yahoo.com>
Authored: Sun Apr 29 10:16:30 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 10:16:30 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[20/50] incubator-airflow git commit: [AIRFLOW-1933] Fix some typos

Posted by fo...@apache.org.
[AIRFLOW-1933] Fix some typos

Closes #2474 from Philippus/patch-1


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ad28decc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ad28decc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ad28decc

Branch: refs/heads/v1-10-test
Commit: ad28decc74a87f6f9fbff57f200199860a2bfa81
Parents: 3952e05
Author: Philippus Baalman <ph...@gmail.com>
Authored: Sun Apr 29 20:08:48 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 20:08:48 2018 -0700

----------------------------------------------------------------------
 UPDATING.md | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad28decc/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 609c8db..8006876 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -74,17 +74,17 @@ With Airflow 1.9 or lower, there were two connection strings for the Google Clou
 
 ### SSH Hook updates, along with new SSH Operator & SFTP Operator
 
-SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
+SSH Hook now uses the Paramiko library to create an ssh client connection, instead of the sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
   - update SSHHook constructor
-  - use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer test_ssh_operator.py for usage info.
-  - SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info.
+  - use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer to test_ssh_operator.py for usage info.
+  - SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer to test_sftp_operator.py.py for usage info.
   - No updates are required if you are using ftpHook, it will continue to work as is.
 
 ### S3Hook switched to use Boto3
 
 The airflow.hooks.S3_hook.S3Hook has been switched to use boto3 instead of the older boto (a.k.a. boto2). This results in a few backwards incompatible changes to the following classes: S3Hook:
   - the constructors no longer accepts `s3_conn_id`. It is now called `aws_conn_id`.
-  - the default conneciton is now "aws_default" instead of "s3_default"
+  - the default connection is now "aws_default" instead of "s3_default"
   - the return type of objects returned by `get_bucket` is now boto3.s3.Bucket
   - the return type of `get_key`, and `get_wildcard_key` is now an boto3.S3.Object.
 
@@ -106,7 +106,7 @@ Once a logger has determined that a message needs to be processed, it is passed
 
 #### Changes in Airflow Logging
 
-Airflow's logging mechanism has been refactored to uses Python’s builtin `logging` module to perform logging of the application. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. Also the `BaseHook` and `BaseOperator` already extends this class, so it is easily available to do logging.
+Airflow's logging mechanism has been refactored to use Python’s builtin `logging` module to perform logging of the application. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. Also the `BaseHook` and `BaseOperator` already extend this class, so it is easily available to do logging.
 
 The main benefit is easier configuration of the logging by setting a single centralized python file. Disclaimer; there is still some inline configuration, but this will be removed eventually. The new logging class is defined by setting the dotted classpath in your `~/airflow/airflow.cfg` file:
 
@@ -153,7 +153,7 @@ The `file_task_handler` logger has been made more flexible. The default format c
 If you are logging to Google cloud storage, please see the [Google cloud platform documentation](https://airflow.incubator.apache.org/integration.html#gcp-google-cloud-platform) for logging instructions.
 
 If you are using S3, the instructions should be largely the same as the Google cloud platform instructions above. You will need a custom logging config. The `REMOTE_BASE_LOG_FOLDER` configuration key in your airflow config has been removed, therefore you will need to take the following steps:
- - Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py) and copy it.
+ - Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py).
  - Place it in a directory inside the Python import path `PYTHONPATH`. If you are using Python 2.7, ensuring that any `__init__.py` files exist so that it is importable.
  - Update the config by setting the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
  - Set the `logging_config_class` to the filename and dict. For example, if you place `custom_logging_config.py` on the base of your pythonpath, you will need to set `logging_config_class = custom_logging_config.LOGGING_CONFIG` in your config as Airflow 1.8.
@@ -180,12 +180,12 @@ supported and will be removed entirely in Airflow 2.0
   Note that JSON serialization is stricter than pickling, so if you want to e.g. pass
   raw bytes through XCom you must encode them using an encoding like base64.
   By default pickling is still enabled until Airflow 2.0. To disable it
-  Set enable_xcom_pickling = False in your Airflow config.
+  set enable_xcom_pickling = False in your Airflow config.
 
 ## Airflow 1.8.1
 
 The Airflow package name was changed from `airflow` to `apache-airflow` during this release. You must uninstall
-previously installed version of Airflow before installing 1.8.1.
+a previously installed version of Airflow before installing 1.8.1.
 
 ## Airflow 1.8
 
@@ -202,12 +202,12 @@ Systemd unit files have been updated. If you use systemd please make sure to upd
 Airflow 1.7.1 has issues with being able to over subscribe to a pool, ie. more slots could be used than were
 available. This is fixed in Airflow 1.8.0, but due to past issue jobs may fail to start although their
 dependencies are met after an upgrade. To workaround either temporarily increase the amount of slots above
-the the amount of queued tasks or use a new pool.
+the amount of queued tasks or use a new pool.
 
 ### Less forgiving scheduler on dynamic start_date
 Using a dynamic start_date (e.g. `start_date = datetime.now()`) is not considered a best practice. The 1.8.0 scheduler
 is less forgiving in this area. If you encounter DAGs not being scheduled you can try using a fixed start_date and
-renaming your dag. The last step is required to make sure you start with a clean slate, otherwise the old schedule can
+renaming your DAG. The last step is required to make sure you start with a clean slate, otherwise the old schedule can
 interfere.
 
 ### New and updated scheduler options
@@ -243,7 +243,7 @@ By default the scheduler will fill any missing interval DAG Runs between the las
 This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as
 `catchup = False / True`. Command line backfills will still work.
 
-### Faulty Dags do not show an error in the Web UI
+### Faulty DAGs do not show an error in the Web UI
 
 Due to changes in the way Airflow processes DAGs the Web UI does not show an error when processing a faulty DAG. To
 find processing errors go the `child_process_log_directory` which defaults to `<AIRFLOW_HOME>/scheduler/latest`.


[23/50] incubator-airflow git commit: closes apache/incubator-airflow#2970 *Closed for inactivity*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2970 *Closed for inactivity*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e29562e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e29562e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e29562e3

Branch: refs/heads/v1-10-test
Commit: e29562e33a05c66bc640c56143f3a2ca4e0a0b98
Parents: 06e90f4
Author: r39132 <si...@yahoo.com>
Authored: Sun Apr 29 20:20:30 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 20:20:30 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[04/50] incubator-airflow git commit: closes apache/incubator-airflow#1933 *Closed for inactivity*

Posted by fo...@apache.org.
closes apache/incubator-airflow#1933 *Closed for inactivity*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b9225215
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b9225215
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b9225215

Branch: refs/heads/v1-10-test
Commit: b9225215873251c061accd89349b82b99ad5d905
Parents: 414a08e
Author: r39132 <si...@yahoo.com>
Authored: Fri Apr 27 09:39:13 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Fri Apr 27 09:39:13 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[06/50] incubator-airflow git commit: closes apache/incubator-airflow#2827 *Closed for inactivity*

Posted by fo...@apache.org.
closes apache/incubator-airflow#2827 *Closed for inactivity*


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67c0099e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67c0099e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67c0099e

Branch: refs/heads/v1-10-test
Commit: 67c0099e97ae5d679f838d404bbbc52cefdf5201
Parents: 172b73c
Author: r39132 <si...@yahoo.com>
Authored: Fri Apr 27 09:46:51 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Fri Apr 27 09:46:51 2018 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[02/50] incubator-airflow git commit: [AIRFLOW-2378] Add Groupon to list of current users

Posted by fo...@apache.org.
[AIRFLOW-2378] Add Groupon to list of current users

Closes #3267 from stevencasey/add_groupon


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/801fe7db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/801fe7db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/801fe7db

Branch: refs/heads/v1-10-test
Commit: 801fe7dbdcc74988fe937e013d7f019df87c44e5
Parents: ae63246
Author: steven casey <26...@users.noreply.github.com>
Authored: Thu Apr 26 18:47:28 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu Apr 26 18:47:28 2018 -0700

----------------------------------------------------------------------
 README.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/801fe7db/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 5e5193c..b504cbd 100644
--- a/README.md
+++ b/README.md
@@ -140,6 +140,7 @@ Currently **officially** using Airflow:
 1. [GovTech GDS](https://gds-gov.tech) [[@chrissng](https://github.com/chrissng) & [@datagovsg](https://github.com/datagovsg)]
 1. [Grand Rounds](https://www.grandrounds.com/) [[@richddr](https://github.com/richddr), [@timz1290](https://github.com/timz1290), [@wenever](https://github.com/@wenever), & [@runongirlrunon](https://github.com/runongirlrunon)]
 1. [Groupalia](http://es.groupalia.com) [[@jesusfcr](https://github.com/jesusfcr)]
+1. [Groupon](https://groupon.com) [[@stevencasey](https://github.com/stevencasey)]
 1. [Gusto](https://gusto.com) [[@frankhsu](https://github.com/frankhsu)]
 1. [Handshake](https://joinhandshake.com/) [[@mhickman](https://github.com/mhickman)]
 1. [Handy](http://www.handy.com/careers/73115?gh_jid=73115&gh_src=o5qcxn) [[@marcintustin](https://github.com/marcintustin) / [@mtustin-handy](https://github.com/mtustin-handy)]


[39/50] incubator-airflow git commit: [AIRFLOW-2394] default cmds and arguments in kubernetes operator

Posted by fo...@apache.org.
[AIRFLOW-2394] default cmds and arguments in kubernetes operator

Commands aand arguments to docker image in kubernetes operator

Closes #3289 from ese/k8soperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/12ab796b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/12ab796b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/12ab796b

Branch: refs/heads/v1-10-test
Commit: 12ab796b11c001f5cc7c5bd294616200b4159dea
Parents: 96d00da
Author: Sergio Ballesteros <sn...@locolandia.net>
Authored: Wed May 2 15:43:40 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed May 2 15:43:51 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/kubernetes/pod_generator.py     |  6 ++---
 .../operators/kubernetes_pod_operator.py        | 23 ++++++++++-------
 docs/kubernetes.rst                             | 26 +++++++++-----------
 3 files changed, 28 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12ab796b/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index d75ba22..78d3926 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -26,11 +26,9 @@ class PodGenerator:
 
     def __init__(self, kube_config=None):
         self.kube_config = kube_config
-        self.env_vars = {}
         self.volumes = []
         self.volume_mounts = []
         self.init_containers = []
-        self.secrets = []
 
     def add_init_container(self,
                            name,
@@ -129,8 +127,8 @@ class PodGenerator:
             cmds=cmds,
             args=arguments,
             labels=labels,
-            envs=self.env_vars,
-            secrets={},
+            envs={},
+            secrets=[],
             # service_account_name=self.kube_config.worker_service_account_name,
             # image_pull_secrets=self.kube_config.image_pull_secrets,
             init_containers=worker_init_container_spec,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12ab796b/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 06c0c5a..32ad582 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -30,11 +30,13 @@ class KubernetesPodOperator(BaseOperator):
     """
     Execute a task in a Kubernetes Pod
 
-    :param image: Docker image name
+    :param image: Docker image you wish to launch. Defaults to dockerhub.io,
+        but fully qualified URLS will point to custom repositories
     :type image: str
-    :param: namespace: namespace name where run the Pod
+    :param: namespace: the namespace to run within kubernetes
     :type: namespace: str
-    :param cmds: entrypoint of the container
+    :param cmds: entrypoint of the container.
+        The docker images's entrypoint is used if this is not provide.
     :type cmds: list
     :param arguments: arguments of to the entrypoint.
         The docker image's CMD is used if this is not provided.
@@ -43,15 +45,18 @@ class KubernetesPodOperator(BaseOperator):
     :type labels: dict
     :param startup_timeout_seconds: timeout in seconds to startup the pod
     :type startup_timeout_seconds: int
-    :param name: name for the pod
+    :param name: name of the task you want to run,
+        will be used to generate a pod id
     :type name: str
     :param env_vars: Environment variables initialized in the container
     :type env_vars: dict
-    :param secrets: Secrets to attach to the container
+    :param secrets: Kubernetes secrets to inject in the container,
+        They can be exposed as environment vars or files in a volume.
     :type secrets: list
     :param in_cluster: run kubernetes client with in_cluster configuration
     :type in_cluster: bool
     :param get_logs: get the stdout of the container as logs of the tasks
+    :type get_logs: bool
     """
 
     def execute(self, context):
@@ -85,9 +90,9 @@ class KubernetesPodOperator(BaseOperator):
     def __init__(self,
                  namespace,
                  image,
-                 cmds,
-                 arguments,
                  name,
+                 cmds=None,
+                 arguments=None,
                  env_vars=None,
                  secrets=None,
                  in_cluster=False,
@@ -99,8 +104,8 @@ class KubernetesPodOperator(BaseOperator):
         super(KubernetesPodOperator, self).__init__(*args, **kwargs)
         self.image = image
         self.namespace = namespace
-        self.cmds = cmds
-        self.arguments = arguments
+        self.cmds = cmds or []
+        self.arguments = arguments or []
         self.labels = labels or {}
         self.startup_timeout_seconds = startup_timeout_seconds
         self.name = name

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12ab796b/docs/kubernetes.rst
----------------------------------------------------------------------
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 1dd77cb..4b83fc0 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -1,5 +1,5 @@
 Kubernetes Executor
-===================
+^^^^^^^^^^^^^^^^^^^
 
 The kubernetes executor is introduced in Apache Airflow 1.10.0. The Kubernetes executor will create a new pod for every task instance.
 
@@ -9,31 +9,29 @@ Example helm charts are available at `scripts/ci/kubernetes/kube/{airflow,volume
 
 
 Kubernetes Operator
-===================
+^^^^^^^^^^^^^^^^^^^
 
+.. code:: python
 
+    from airflow.contrib.operators import KubernetesOperator
+    from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+    from airflow.contrib.kubernetes.secret import Secret
 
-.. code:: python
+    secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
+    secret_env  = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
 
-    from airflow.comtrib.operators import KubernetesOperator
     k = KubernetesPodOperator(namespace='default',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "10"],
                               labels={"foo": "bar"},
+                              secrets=[secret_file,secret_env]
                               name="test",
                               task_id="task"
                               )
 
 
+.. autoclass:: airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator
+
+.. autoclass:: airflow.contrib.operators.secret.Secret
 
-=================================   ====================================
-Variable                            Description
-=================================   ====================================
-``@namespace``                      The namespace is your isolated work environment within kubernetes
-``@image``                          docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories
-``@cmds``                           To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container
-``arguments``                       arguments for your bash command
-``@labels``                         Labels are an important element of launching kubernetes pods, as it tells kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label  {'postgres':'foo'} and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service.
-``@name``                           name of the task you want to run, will be used to generate a pod id
-=================================   ====================================


[35/50] incubator-airflow git commit: [AIRFLOW-2400] Add Ability to set Environment Variables for K8s

Posted by fo...@apache.org.
[AIRFLOW-2400] Add Ability to set Environment Variables for K8s

[AIRFLOW-2400] Env Variables for K8s

Allow environment variables to be set for the
KubernetesPodOperator.

Fix typo

Fix documentation variable type

Closes #3284 from jkao/add-env-vars-
to-k8s-operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fef7d304
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fef7d304
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fef7d304

Branch: refs/heads/v1-10-test
Commit: fef7d304d4aa2534e38da74921d482d4768991a6
Parents: 4428d1b
Author: Jeff Kao <je...@gmail.com>
Authored: Tue May 1 22:09:31 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Tue May 1 22:09:31 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/kubernetes/pod.py                    | 2 +-
 airflow/contrib/operators/kubernetes_pod_operator.py | 6 +++++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fef7d304/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index df60c8a..e5e9a4d 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -45,7 +45,7 @@ class Pod:
     :param image: The docker image
     :type image: str
     :param envs: A dict containing the environment variables
-    :type envs:s dict
+    :type envs: dict
     :param cmds: The command to be run on the pod
     :type cmds: list str
     :param secrets: Secrets to be launched to the pod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fef7d304/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 9e95d8b..06c0c5a 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -45,6 +45,8 @@ class KubernetesPodOperator(BaseOperator):
     :type startup_timeout_seconds: int
     :param name: name for the pod
     :type name: str
+    :param env_vars: Environment variables initialized in the container
+    :type env_vars: dict
     :param secrets: Secrets to attach to the container
     :type secrets: list
     :param in_cluster: run kubernetes client with in_cluster configuration
@@ -54,7 +56,6 @@ class KubernetesPodOperator(BaseOperator):
 
     def execute(self, context):
         try:
-
             client = kube_client.get_kube_client(in_cluster=self.in_cluster)
             gen = pod_generator.PodGenerator()
 
@@ -68,6 +69,7 @@ class KubernetesPodOperator(BaseOperator):
             )
 
             pod.secrets = self.secrets
+            pod.envs = self.env_vars
 
             launcher = pod_launcher.PodLauncher(client)
             final_state = launcher.run_pod(
@@ -86,6 +88,7 @@ class KubernetesPodOperator(BaseOperator):
                  cmds,
                  arguments,
                  name,
+                 env_vars=None,
                  secrets=None,
                  in_cluster=False,
                  labels=None,
@@ -101,6 +104,7 @@ class KubernetesPodOperator(BaseOperator):
         self.labels = labels or {}
         self.startup_timeout_seconds = startup_timeout_seconds
         self.name = name
+        self.env_vars = env_vars or {}
         self.secrets = secrets or []
         self.in_cluster = in_cluster
         self.get_logs = get_logs


[45/50] incubator-airflow git commit: [AIRFLOW-2313] Add TTL parameters for Dataproc

Posted by fo...@apache.org.
[AIRFLOW-2313] Add TTL parameters for Dataproc

Three additional optional parameters to
DataprocClusterCreateOperator
which configure Cloud Dataproc Cluster Scheduled
Deletion features.

Closes #3217 from ebartkus/dataproc-ttl


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b02820a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b02820a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b02820a7

Branch: refs/heads/v1-10-test
Commit: b02820a7afcb5205df39c4a639f1ceeb2c9c75ee
Parents: c5fa8cd
Author: ebartkus <eb...@trustpilot.com>
Authored: Thu May 3 23:10:49 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Thu May 3 23:10:49 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataproc_hook.py      |  2 +-
 airflow/contrib/operators/dataproc_operator.py  | 30 ++++++++-
 .../contrib/operators/test_dataproc_operator.py | 67 ++++++++++++++++++--
 3 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 7d95897..7849e17 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -197,7 +197,7 @@ class DataProcHook(GoogleCloudBaseHook):
     def __init__(self,
                  gcp_conn_id='google_cloud_default',
                  delegate_to=None,
-                 api_version='v1'):
+                 api_version='v1beta2'):
         super(DataProcHook, self).__init__(gcp_conn_id, delegate_to)
         self.api_version = api_version
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index ad0aa09..1614720 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -32,6 +32,7 @@ from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.version import version
 from googleapiclient.errors import HttpError
+from airflow.utils import timezone
 
 
 class DataprocClusterCreateOperator(BaseOperator):
@@ -105,6 +106,16 @@ class DataprocClusterCreateOperator(BaseOperator):
     :type service_account: string
     :param service_account_scopes: The URIs of service account scopes to be included.
     :type service_account_scopes: list[string]
+    :param idle_delete_ttl: The longest duration that cluster would keep alive while
+        staying idle. Passing this threshold will cause cluster to be auto-deleted.
+        A duration in seconds.
+    :type idle_delete_ttl: int
+    :param auto_delete_time:  The time when cluster will be auto-deleted.
+    :type auto_delete_time: datetime
+    :param auto_delete_ttl: The life duration of cluster, the cluster will be
+        auto-deleted at the end of this duration.
+        A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
+    :type auto_delete_ttl: int
     """
 
     template_fields = ['cluster_name', 'project_id', 'zone', 'region']
@@ -135,6 +146,9 @@ class DataprocClusterCreateOperator(BaseOperator):
                  delegate_to=None,
                  service_account=None,
                  service_account_scopes=None,
+                 idle_delete_ttl=None,
+                 auto_delete_time=None,
+                 auto_delete_ttl=None,
                  *args,
                  **kwargs):
 
@@ -163,6 +177,9 @@ class DataprocClusterCreateOperator(BaseOperator):
         self.region = region
         self.service_account = service_account
         self.service_account_scopes = service_account_scopes
+        self.idle_delete_ttl = idle_delete_ttl
+        self.auto_delete_time = auto_delete_time
+        self.auto_delete_ttl = auto_delete_ttl
 
     def _get_cluster_list_for_project(self, service):
         result = service.projects().regions().clusters().list(
@@ -261,7 +278,8 @@ class DataprocClusterCreateOperator(BaseOperator):
                     }
                 },
                 'secondaryWorkerConfig': {},
-                'softwareConfig': {}
+                'softwareConfig': {},
+                'lifecycleConfig': {}
             }
         }
         if self.num_preemptible_workers > 0:
@@ -294,6 +312,16 @@ class DataprocClusterCreateOperator(BaseOperator):
             cluster_data['config']['softwareConfig']['imageVersion'] = self.image_version
         if self.properties:
             cluster_data['config']['softwareConfig']['properties'] = self.properties
+        if self.idle_delete_ttl:
+            cluster_data['config']['lifecycleConfig']['idleDeleteTtl'] = \
+                "{}s".format(self.idle_delete_ttl)
+        if self.auto_delete_time:
+            utc_auto_delete_time = timezone.convert_to_utc(self.auto_delete_time)
+            cluster_data['config']['lifecycleConfig']['autoDeleteTime'] = \
+                utc_auto_delete_time.format('%Y-%m-%dT%H:%M:%S.%fZ', formatter='classic')
+        elif self.auto_delete_ttl:
+            cluster_data['config']['lifecycleConfig']['autoDeleteTtl'] = \
+                "{}s".format(self.auto_delete_ttl)
         if self.init_actions_uris:
             init_actions_dict = [
                 {

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index e8cd1e5..d039cf1 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -69,6 +69,9 @@ SERVICE_ACCOUNT_SCOPES = [
     'https://www.googleapis.com/auth/bigquery',
     'https://www.googleapis.com/auth/bigtable.data'
 ]
+IDLE_DELETE_TTL = 321
+AUTO_DELETE_TIME = datetime.datetime(2017, 6, 7)
+AUTO_DELETE_TTL = 654
 DEFAULT_DATE = datetime.datetime(2017, 6, 6)
 REGION = 'test-region'
 MAIN_URI = 'test-uri'
@@ -102,8 +105,11 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                     worker_machine_type=WORKER_MACHINE_TYPE,
                     worker_disk_size=WORKER_DISK_SIZE,
                     num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
-                    labels = deepcopy(labels),
-                    service_account_scopes = SERVICE_ACCOUNT_SCOPES
+                    labels=deepcopy(labels),
+                    service_account_scopes=SERVICE_ACCOUNT_SCOPES,
+                    idle_delete_ttl=IDLE_DELETE_TTL,
+                    auto_delete_time=AUTO_DELETE_TIME,
+                    auto_delete_ttl=AUTO_DELETE_TTL
                 )
              )
         self.dag = DAG(
@@ -136,6 +142,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
             self.assertEqual(dataproc_operator.labels, self.labels[suffix])
             self.assertEqual(dataproc_operator.service_account_scopes,
                              SERVICE_ACCOUNT_SCOPES)
+            self.assertEqual(dataproc_operator.idle_delete_ttl, IDLE_DELETE_TTL)
+            self.assertEqual(dataproc_operator.auto_delete_time, AUTO_DELETE_TIME)
+            self.assertEqual(dataproc_operator.auto_delete_ttl, AUTO_DELETE_TTL)
 
     def test_get_init_action_timeout(self):
         for suffix, dataproc_operator in enumerate(self.dataproc_operators):
@@ -160,6 +169,10 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                 NETWORK_URI)
             self.assertEqual(cluster_data['config']['gceClusterConfig']['tags'],
                 TAGS)
+            self.assertEqual(cluster_data['config']['lifecycleConfig']['idleDeleteTtl'],
+                             "321s")
+            self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
+                             "2017-06-07T00:00:00.000000Z")
             # test whether the default airflow-version label has been properly
             # set to the dataproc operator.
             merged_labels = {}
@@ -169,6 +182,52 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                                      cluster_data['labels']['airflow-version']))
             self.assertEqual(cluster_data['labels'], merged_labels)
 
+    def test_build_cluster_data_with_autoDeleteTime(self):
+        dataproc_operator = DataprocClusterCreateOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            project_id=PROJECT_ID,
+            num_workers=NUM_WORKERS,
+            zone=ZONE,
+            dag=self.dag,
+            auto_delete_time=AUTO_DELETE_TIME,
+        )
+        cluster_data = dataproc_operator._build_cluster_data()
+        self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
+                         "2017-06-07T00:00:00.000000Z")
+
+    def test_build_cluster_data_with_autoDeleteTtl(self):
+        dataproc_operator = DataprocClusterCreateOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            project_id=PROJECT_ID,
+            num_workers=NUM_WORKERS,
+            zone=ZONE,
+            dag=self.dag,
+            auto_delete_ttl=AUTO_DELETE_TTL,
+        )
+        cluster_data = dataproc_operator._build_cluster_data()
+        self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTtl'],
+                         "654s")
+
+    def test_build_cluster_data_with_autoDeleteTime_and_autoDeleteTtl(self):
+        dataproc_operator = DataprocClusterCreateOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            project_id=PROJECT_ID,
+            num_workers=NUM_WORKERS,
+            zone=ZONE,
+            dag=self.dag,
+            auto_delete_time=AUTO_DELETE_TIME,
+            auto_delete_ttl=AUTO_DELETE_TTL,
+        )
+        cluster_data = dataproc_operator._build_cluster_data()
+        if 'autoDeleteTtl' in cluster_data['config']['lifecycleConfig']:
+            self.fail("If 'auto_delete_time' and 'auto_delete_ttl' is set, " +
+                      "only `auto_delete_time` is used")
+        self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
+                         "2017-06-07T00:00:00.000000Z")
+
     def test_cluster_name_log_no_sub(self):
         with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
             mock_hook.return_value.get_conn = self.mock_conn