You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/04 07:01:57 UTC
[01/50] incubator-airflow git commit: [AIRFLOW-2382] Fix wrong
description for delimiter [Forced Update!]
Repository: incubator-airflow
Updated Branches:
refs/heads/v1-10-test 5495d2590 -> 16bae5634 (forced update)
[AIRFLOW-2382] Fix wrong description for delimiter
Fix misleading descriptions for the 'delimiter'
parameter in S3ListOperator and
S3ToGoogleCloudStorageOperator's docstring.
Closes #3270 from sekikn/AIRFLOW-2382
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ae63246a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ae63246a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ae63246a
Branch: refs/heads/v1-10-test
Commit: ae63246a1d580d77c366276300139165e4c984de
Parents: 36193fc
Author: Kengo Seki <se...@apache.org>
Authored: Thu Apr 26 18:45:12 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu Apr 26 18:45:12 2018 -0700
----------------------------------------------------------------------
airflow/contrib/operators/s3_list_operator.py | 12 +++++-------
airflow/contrib/operators/s3_to_gcs_operator.py | 6 ++----
2 files changed, 7 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae63246a/airflow/contrib/operators/s3_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py
index 1dcbc60..dbb45fe 100644
--- a/airflow/contrib/operators/s3_list_operator.py
+++ b/airflow/contrib/operators/s3_list_operator.py
@@ -24,8 +24,7 @@ from airflow.utils.decorators import apply_defaults
class S3ListOperator(BaseOperator):
"""
- List all objects from the bucket with the given string prefix and delimiter
- in name.
+ List all objects from the bucket with the given string prefix in name.
This operator returns a python list with the name of objects which can be
used by `xcom` in the downstream task.
@@ -35,22 +34,21 @@ class S3ListOperator(BaseOperator):
:param prefix: Prefix string to filters the objects whose name begin with
such prefix
:type prefix: string
- :param delimiter: The delimiter by which you want to filter the objects.
- For e.g to lists the CSV files from in a directory in S3 you would use
- delimiter='.csv'.
+ :param delimiter: the delimiter marks key hierarchy.
:type delimiter: string
:param aws_conn_id: The connection ID to use when connecting to S3 storage.
:type aws_conn_id: string
**Example**:
- The following operator would list all the CSV files from the S3
+ The following operator would list all the files
+ (excluding subfolders) from the S3
``customers/2018/04/`` key in the ``data`` bucket. ::
s3_file = S3ListOperator(
task_id='list_3s_files',
bucket='data',
prefix='customers/2018/04/',
- delimiter='.csv',
+ delimiter='/',
aws_conn_id='aws_customers_conn'
)
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae63246a/airflow/contrib/operators/s3_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py b/airflow/contrib/operators/s3_to_gcs_operator.py
index d105596..5a2004d 100644
--- a/airflow/contrib/operators/s3_to_gcs_operator.py
+++ b/airflow/contrib/operators/s3_to_gcs_operator.py
@@ -37,12 +37,10 @@ class S3ToGoogleCloudStorageOperator(S3ListOperator):
:param prefix: Prefix string which filters objects whose name begin with
such prefix.
:type prefix: string
- :param delimiter: The delimiter by which you want to filter the objects on.
- E.g. to list CSV files from a S3 key you would do the following,
- `delimiter='.csv'`.
+ :param delimiter: the delimiter marks key hierarchy.
:type delimiter: string
:param aws_conn_id: The source S3 connection
- :type aws_conn_id: str
+ :type aws_conn_id: string
:param dest_gcs_conn_id: The destination connection ID to use
when connecting to Google Cloud Storage.
:type dest_gcs_conn_id: string
[32/50] incubator-airflow git commit: [AIRFLOW-2401] Document the use
of variables in Jinja template
Posted by fo...@apache.org.
[AIRFLOW-2401] Document the use of variables in Jinja template
Closes #2847 from moe-nadal-ck/patch-1
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a67c13e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a67c13e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a67c13e4
Branch: refs/heads/v1-10-test
Commit: a67c13e44cb96acacc79896b327bbaaae19707a1
Parents: 8921e7d
Author: Moe Nadal <mo...@creditkarma.com>
Authored: Mon Apr 30 15:05:47 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 15:06:10 2018 -0700
----------------------------------------------------------------------
docs/concepts.rst | 12 ++++++++++++
1 file changed, 12 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a67c13e4/docs/concepts.rst
----------------------------------------------------------------------
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 89c25fe..c28b10f 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -427,6 +427,18 @@ The second call assumes ``json`` content and will be deserialized into
``bar``. Note that ``Variable`` is a sqlalchemy model and can be used
as such.
+You can use a variable from a jinja template with the syntax :
+
+.. code:: bash
+
+ echo {{ var.value.<variable_name> }}
+
+or if you need to deserialize a json object from the variable :
+
+.. code:: bash
+
+ echo {{ var.json.<variable_name> }}
+
Branching
=========
[47/50] incubator-airflow git commit: closes
apache/incubator-airflow#2675 *Closed for inactivity.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2675 *Closed for inactivity.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3a28ceb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3a28ceb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3a28ceb5
Branch: refs/heads/v1-10-test
Commit: 3a28ceb5c5c5d48e2cded54a010b6f3ca45ac84e
Parents: 02a88f6
Author: r39132 <si...@yahoo.com>
Authored: Thu May 3 22:52:39 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu May 3 22:52:39 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[34/50] incubator-airflow git commit: [AIRFLOW-XXX] Add Twine Labs as
an Airflow user
Posted by fo...@apache.org.
[AIRFLOW-XXX] Add Twine Labs as an Airflow user
Closes #3287 from ivorpeles/twine_airflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4428d1bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4428d1bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4428d1bb
Branch: refs/heads/v1-10-test
Commit: 4428d1bbbf943cc7868c74d13f53cf0e336dfec7
Parents: 2d1b2ae
Author: ivor <iv...@mail.utoronto.ca>
Authored: Tue May 1 22:07:47 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Tue May 1 22:07:51 2018 +0200
----------------------------------------------------------------------
README.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4428d1bb/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index bd09e9c..86b4a8c 100644
--- a/README.md
+++ b/README.md
@@ -220,6 +220,7 @@ Currently **officially** using Airflow:
1. [Tictail](https://tictail.com/)
1. [Tile](https://tile.com/) [[@ranjanmanish](https://github.com/ranjanmanish)]
1. [Tokopedia](https://www.tokopedia.com/) [@topedmaria](https://github.com/topedmaria)
+1. [Twine Labs](https://www.twinelabs.com/) [[@ivorpeles](https://github.com/ivorpeles)]
1. [T2 Systems](http://t2systems.com) [[@unclaimedpants](https://github.com/unclaimedpants)]
1. [Ubisoft](https://www.ubisoft.com/) [[@Walkoss](https://github.com/Walkoss)]
1. [United Airlines](https://www.united.com/) [[@ilopezfr](https://github.com/ilopezfr)]
[07/50] incubator-airflow git commit: [AIRFLOW-2391] Fix to Flask
0.12.2
Posted by fo...@apache.org.
[AIRFLOW-2391] Fix to Flask 0.12.2
Flask 0.12.3 has issues with Airflow and needs to
be fixed.
Therefore lock the version to 0.12.2.
Closes #3277 from Fokko/fd-fix-master-ci
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3368f425
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3368f425
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3368f425
Branch: refs/heads/v1-10-test
Commit: 3368f4258c2dcfbcdbaf631fa887a742f12720b8
Parents: 67c0099
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Sat Apr 28 20:25:13 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sat Apr 28 20:25:13 2018 +0200
----------------------------------------------------------------------
setup.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3368f425/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 742e01b..b5461eb 100644
--- a/setup.py
+++ b/setup.py
@@ -229,7 +229,7 @@ def do_setup():
'configparser>=3.5.0, <3.6.0',
'croniter>=0.3.17, <0.4',
'dill>=0.2.2, <0.3',
- 'flask>=0.12, <0.13',
+ 'flask==0.12.2',
'flask-appbuilder>=1.9.6, <2.0.0',
'flask-admin==1.4.1',
'flask-caching>=1.3.3, <1.4.0',
[44/50] incubator-airflow git commit: [AIRFLOW-2411] add
dataproc_jars to templated_fields
Posted by fo...@apache.org.
[AIRFLOW-2411] add dataproc_jars to templated_fields
This commit makes it possible to use jinja
templates when passing
JAR file URIs to the DataProc operators that
require JAR files,
specifically the DataProc Hive, Pig, SparkSql,
Spark, Hadoop and
PySpark operators.
Closes #3305 from mchalek/template-dataproc-jars
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c5fa8cd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c5fa8cd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c5fa8cd4
Branch: refs/heads/v1-10-test
Commit: c5fa8cd411ca67889eafd109988d2472ccfbba10
Parents: 6b3f6ce
Author: Kevin McHale <mc...@gmail.com>
Authored: Thu May 3 22:55:10 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Thu May 3 22:55:10 2018 +0200
----------------------------------------------------------------------
airflow/contrib/operators/dataproc_operator.py | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5fa8cd4/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 56ebb91..ad0aa09 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -481,7 +481,7 @@ class DataProcPigOperator(BaseOperator):
:param region: The specified region where the dataproc cluster is created.
:type region: string
"""
- template_fields = ['query', 'variables', 'job_name', 'cluster_name']
+ template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
template_ext = ('.pg', '.pig',)
ui_color = '#0273d4'
@@ -561,7 +561,7 @@ class DataProcHiveOperator(BaseOperator):
:param region: The specified region where the dataproc cluster is created.
:type region: string
"""
- template_fields = ['query', 'variables', 'job_name', 'cluster_name']
+ template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
template_ext = ('.q',)
ui_color = '#0273d4'
@@ -642,7 +642,7 @@ class DataProcSparkSqlOperator(BaseOperator):
:param region: The specified region where the dataproc cluster is created.
:type region: string
"""
- template_fields = ['query', 'variables', 'job_name', 'cluster_name']
+ template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
template_ext = ('.q',)
ui_color = '#0273d4'
@@ -731,7 +731,7 @@ class DataProcSparkOperator(BaseOperator):
:type region: string
"""
- template_fields = ['arguments', 'job_name', 'cluster_name']
+ template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
ui_color = '#0273d4'
@apply_defaults
@@ -821,7 +821,7 @@ class DataProcHadoopOperator(BaseOperator):
:type region: string
"""
- template_fields = ['arguments', 'job_name', 'cluster_name']
+ template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
ui_color = '#0273d4'
@apply_defaults
@@ -911,7 +911,7 @@ class DataProcPySparkOperator(BaseOperator):
:type region: string
"""
- template_fields = ['arguments', 'job_name', 'cluster_name']
+ template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
ui_color = '#0273d4'
@staticmethod
[12/50] incubator-airflow git commit: [AIRFLOW-1575] Add AWS Kinesis
Firehose Hook for inserting batch records
Posted by fo...@apache.org.
[AIRFLOW-1575] Add AWS Kinesis Firehose Hook for inserting batch records
Closes #3275 from sid88in/feature/kinesis_hookv2
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d588e94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d588e94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d588e94
Branch: refs/heads/v1-10-test
Commit: 2d588e9433cd9a1a1381cf939f579f7d7e53330f
Parents: e691acc
Author: sid.gupta <si...@glassdoor.com>
Authored: Sat Apr 28 23:11:36 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:11:36 2018 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/aws_firehose_hook.py | 52 ++++++++++++++++
tests/contrib/hooks/test_aws_firehose_hook.py | 70 ++++++++++++++++++++++
2 files changed, 122 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/airflow/contrib/hooks/aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/aws_firehose_hook.py b/airflow/contrib/hooks/aws_firehose_hook.py
new file mode 100644
index 0000000..cf7b2fc
--- /dev/null
+++ b/airflow/contrib/hooks/aws_firehose_hook.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.aws_hook import AwsHook
+
+
+class AwsFirehoseHook(AwsHook):
+ """
+ Interact with AWS Kinesis Firehose.
+ :param delivery_stream: Name of the delivery stream
+ :type delivery_stream: str
+ :param region_name: AWS region name (example: us-east-1)
+ :type region_name: str
+ """
+
+ def __init__(self, delivery_stream, region_name=None, *args, **kwargs):
+ self.delivery_stream = delivery_stream
+ self.region_name = region_name
+ super(AwsFirehoseHook, self).__init__(*args, **kwargs)
+
+ def get_conn(self):
+ """
+ Returns AwsHook connection object.
+ """
+
+ self.conn = self.get_client_type('firehose', self.region_name)
+ return self.conn
+
+ def put_records(self, records):
+ """
+ Write batch records to Kinesis Firehose
+ """
+
+ firehose_conn = self.get_conn()
+
+ response = firehose_conn.put_record_batch(
+ DeliveryStreamName=self.delivery_stream,
+ Records=records
+ )
+
+ return response
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/tests/contrib/hooks/test_aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_aws_firehose_hook.py b/tests/contrib/hooks/test_aws_firehose_hook.py
new file mode 100644
index 0000000..0a2c809
--- /dev/null
+++ b/tests/contrib/hooks/test_aws_firehose_hook.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import uuid
+
+from airflow.contrib.hooks.aws_firehose_hook import AwsFirehoseHook
+
+try:
+ from moto import mock_kinesis
+except ImportError:
+ mock_kinesis = None
+
+
+class TestAwsFirehoseHook(unittest.TestCase):
+
+ @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present')
+ @mock_kinesis
+ def test_get_conn_returns_a_boto3_connection(self):
+ hook = AwsFirehoseHook(aws_conn_id='aws_default',
+ delivery_stream="test_airflow", region_name="us-east-1")
+ self.assertIsNotNone(hook.get_conn())
+
+ @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present')
+ @mock_kinesis
+ def test_insert_batch_records_kinesis_firehose(self):
+ hook = AwsFirehoseHook(aws_conn_id='aws_default',
+ delivery_stream="test_airflow", region_name="us-east-1")
+
+ response = hook.get_conn().create_delivery_stream(
+ DeliveryStreamName="test_airflow",
+ S3DestinationConfiguration={
+ 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
+ 'BucketARN': 'arn:aws:s3:::kinesis-test',
+ 'Prefix': 'airflow/',
+ 'BufferingHints': {
+ 'SizeInMBs': 123,
+ 'IntervalInSeconds': 124
+ },
+ 'CompressionFormat': 'UNCOMPRESSED',
+ }
+ )
+
+ stream_arn = response['DeliveryStreamARN']
+ self.assertEquals(
+ stream_arn, "arn:aws:firehose:us-east-1:123456789012:deliverystream/test_airflow")
+
+ records = [{"Data": str(uuid.uuid4())}
+ for _ in range(100)]
+
+ response = hook.put_records(records)
+
+ self.assertEquals(response['FailedPutCount'], 0)
+ self.assertEquals(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+
+if __name__ == '__main__':
+ unittest.main()
[46/50] incubator-airflow git commit: closes
apache/incubator-airflow#2962 *Closed for inactivity.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2962 *Closed for inactivity.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/02a88f6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/02a88f6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/02a88f6f
Branch: refs/heads/v1-10-test
Commit: 02a88f6f6ff34b07630efe6157e1fec3bc3bb505
Parents: b02820a
Author: r39132 <si...@yahoo.com>
Authored: Thu May 3 22:51:30 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu May 3 22:51:30 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[30/50] incubator-airflow git commit: [AIRFLOW-2403] Fix License
Headers
Posted by fo...@apache.org.
[AIRFLOW-2403] Fix License Headers
Closes #3285 from r39132/fix_license_headers
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f63a2b1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f63a2b1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f63a2b1a
Branch: refs/heads/v1-10-test
Commit: f63a2b1a1f1c216e22d6c46ecbd56199a2ff398e
Parents: a7e8f48
Author: r39132 <si...@yahoo.com>
Authored: Mon Apr 30 13:12:51 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 13:12:51 2018 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/aws_firehose_hook.py | 24 ++++++++++++---------
tests/contrib/hooks/test_aws_firehose_hook.py | 25 ++++++++++++----------
2 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f63a2b1a/airflow/contrib/hooks/aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/aws_firehose_hook.py b/airflow/contrib/hooks/aws_firehose_hook.py
index cf7b2fc..b273e22 100644
--- a/airflow/contrib/hooks/aws_firehose_hook.py
+++ b/airflow/contrib/hooks/aws_firehose_hook.py
@@ -1,16 +1,20 @@
-# -*- coding: utf-8 -*-
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
from airflow.contrib.hooks.aws_hook import AwsHook
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f63a2b1a/tests/contrib/hooks/test_aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_aws_firehose_hook.py b/tests/contrib/hooks/test_aws_firehose_hook.py
index 0a2c809..f22bcde 100644
--- a/tests/contrib/hooks/test_aws_firehose_hook.py
+++ b/tests/contrib/hooks/test_aws_firehose_hook.py
@@ -1,17 +1,20 @@
-# -*- coding: utf-8 -*-
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# http://www.apache.org/licenses/LICENSE-2.0
#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
import unittest
import uuid
[25/50] incubator-airflow git commit: [AIRFLOW-2389] Create a pinot
db api hook
Posted by fo...@apache.org.
[AIRFLOW-2389] Create a pinot db api hook
Closes #3274 from feng-tao/pinot_db_hook
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/700c0f48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/700c0f48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/700c0f48
Branch: refs/heads/v1-10-test
Commit: 700c0f488f5f08772983584b2b635973618548b5
Parents: ae48fce
Author: Tao feng <tf...@lyft.com>
Authored: Mon Apr 30 08:41:43 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Apr 30 08:41:43 2018 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/pinot_hook.py | 105 ++++++++++++++++++++++++++++
docs/code.rst | 1 +
setup.py | 6 +-
tests/contrib/hooks/test_pinot_hook.py | 76 ++++++++++++++++++++
4 files changed, 186 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/700c0f48/airflow/contrib/hooks/pinot_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/pinot_hook.py b/airflow/contrib/hooks/pinot_hook.py
new file mode 100644
index 0000000..d731211
--- /dev/null
+++ b/airflow/contrib/hooks/pinot_hook.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import six
+
+from pinotdb import connect
+
+from airflow.hooks.dbapi_hook import DbApiHook
+
+
+class PinotDbApiHook(DbApiHook):
+ """
+ Connect to pinot db(https://github.com/linkedin/pinot) to issue pql
+ """
+ conn_name_attr = 'pinot_broker_conn_id'
+ default_conn_name = 'pinot_broker_default'
+ supports_autocommit = False
+
+ def __init__(self, *args, **kwargs):
+ super(PinotDbApiHook, self).__init__(*args, **kwargs)
+
+ def get_conn(self):
+ """
+ Establish a connection to pinot broker through pinot dbqpi.
+ """
+ conn = self.get_connection(self.pinot_broker_conn_id)
+ pinot_broker_conn = connect(
+ host=conn.host,
+ port=conn.port,
+ path=conn.extra_dejson.get('endpoint', '/pql'),
+ scheme=conn.extra_dejson.get('schema', 'http')
+ )
+ self.log.info('Get the connection to pinot '
+ 'broker on {host}'.format(host=conn.host))
+ return pinot_broker_conn
+
+ def get_uri(self):
+ """
+ Get the connection uri for pinot broker.
+
+ e.g: http://localhost:9000/pql
+ """
+ conn = self.get_connection(getattr(self, self.conn_name_attr))
+ host = conn.host
+ if conn.port is not None:
+ host += ':{port}'.format(port=conn.port)
+ conn_type = 'http' if not conn.conn_type else conn.conn_type
+ endpoint = conn.extra_dejson.get('endpoint', 'pql')
+ return '{conn_type}://{host}/{endpoint}'.format(
+ conn_type=conn_type, host=host, endpoint=endpoint)
+
+ def get_records(self, sql):
+ """
+ Executes the sql and returns a set of records.
+
+ :param sql: the sql statement to be executed (str) or a list of
+ sql statements to execute
+ :type sql: str
+ """
+ if six.PY2:
+ sql = sql.encode('utf-8')
+
+ with self.get_conn() as cur:
+ cur.execute(sql)
+ return cur.fetchall()
+
+ def get_first(self, sql):
+ """
+ Executes the sql and returns the first resulting row.
+
+ :param sql: the sql statement to be executed (str) or a list of
+ sql statements to execute
+ :type sql: str or list
+ """
+ if six.PY2:
+ sql = sql.encode('utf-8')
+
+ with self.get_conn() as cur:
+ cur.execute(sql)
+ return cur.fetchone()
+
+ def set_autocommit(self, conn, autocommit):
+ raise NotImplementedError()
+
+ def get_pandas_df(self, sql, parameters=None):
+ raise NotImplementedError()
+
+ def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
+ raise NotImplementedError()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/700c0f48/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 16867f6..c979f26 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -365,6 +365,7 @@ Community contributed hooks
.. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
.. autoclass:: airflow.contrib.hooks.jenkins_hook.JenkinsHook
.. autoclass:: airflow.contrib.hooks.jira_hook.JiraHook
+.. autoclass:: airflow.contrib.hooks.pinot_hook.PinotDbApiHook
.. autoclass:: airflow.contrib.hooks.qubole_hook.QuboleHook
.. autoclass:: airflow.contrib.hooks.redis_hook.RedisHook
.. autoclass:: airflow.contrib.hooks.redshift_hook.RedshiftHook
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/700c0f48/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index fef8d8a..1842a14 100644
--- a/setup.py
+++ b/setup.py
@@ -151,6 +151,7 @@ mysql = ['mysqlclient>=1.3.6']
rabbitmq = ['librabbitmq>=1.6.1']
oracle = ['cx_Oracle>=5.1.2']
postgres = ['psycopg2-binary>=2.7.4']
+pinot = ['pinotdb>=0.1.1']
ssh = ['paramiko>=2.1.1', 'pysftp>=0.2.9']
salesforce = ['simple-salesforce>=0.72']
s3 = ['boto3>=1.7.0']
@@ -177,7 +178,7 @@ snowflake = ['snowflake-connector-python>=1.5.2',
'snowflake-sqlalchemy>=1.1.0']
zendesk = ['zdesk']
-all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid
+all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid + pinot
devel = [
'click',
'freezegun',
@@ -200,7 +201,7 @@ devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle +
docker + ssh + kubernetes + celery + azure + redis + gcp_api + datadog +
zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
- druid + snowflake + elasticsearch)
+ druid + pinot + snowflake + elasticsearch)
# Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
if PY3:
@@ -293,6 +294,7 @@ def do_setup():
'mysql': mysql,
'oracle': oracle,
'password': password,
+ 'pinot': pinot,
'postgres': postgres,
'qds': qds,
'rabbitmq': rabbitmq,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/700c0f48/tests/contrib/hooks/test_pinot_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_pinot_hook.py b/tests/contrib/hooks/test_pinot_hook.py
new file mode 100644
index 0000000..d63ee1c
--- /dev/null
+++ b/tests/contrib/hooks/test_pinot_hook.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+import unittest
+
+from airflow.contrib.hooks.pinot_hook import PinotDbApiHook
+
+
+class TestPinotDbApiHook(unittest.TestCase):
+
+ def setUp(self):
+ super(TestPinotDbApiHook, self).setUp()
+ self.conn = conn = mock.MagicMock()
+ self.conn.host = 'host'
+ self.conn.port = '1000'
+ self.conn.conn_type = 'http'
+ self.conn.extra_dejson = {'endpoint': 'pql'}
+ self.cur = mock.MagicMock()
+ self.conn.__enter__.return_value = self.cur
+ self.conn.__exit__.return_value = None
+
+ class TestPinotDBApiHook(PinotDbApiHook):
+ def get_conn(self):
+ return conn
+
+ def get_connection(self, conn_id):
+ return conn
+
+ self.db_hook = TestPinotDBApiHook
+
+ def test_get_uri(self):
+ """
+ Test on getting a pinot connection uri
+ """
+ db_hook = self.db_hook()
+ self.assertEquals(db_hook.get_uri(), 'http://host:1000/pql')
+
+ def test_get_conn(self):
+ """
+ Test on getting a pinot connection
+ """
+ conn = self.db_hook().get_conn()
+ self.assertEqual(conn.host, 'host')
+ self.assertEqual(conn.port, '1000')
+ self.assertEqual(conn.conn_type, 'http')
+ self.assertEqual(conn.extra_dejson.get('endpoint'), 'pql')
+
+ def test_get_records(self):
+ statement = 'SQL'
+ result_sets = [('row1',), ('row2',)]
+ self.cur.fetchall.return_value = result_sets
+ self.assertEqual(result_sets, self.db_hook().get_records(statement))
+
+ def test_get_first(self):
+ statement = 'SQL'
+ result_sets = [('row1',), ('row2',)]
+ self.cur.fetchone.return_value = result_sets[0]
+ self.assertEqual(result_sets[0], self.db_hook().get_first(statement))
[26/50] incubator-airflow git commit: Revert "[AIRFLOW-2391] Fix to
Flask 0.12.2"
Posted by fo...@apache.org.
Revert "[AIRFLOW-2391] Fix to Flask 0.12.2"
This reverts commit 3368f4258c2dcfbcdbaf631fa887a742f12720b8.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0ff434a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0ff434a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0ff434a9
Branch: refs/heads/v1-10-test
Commit: 0ff434a9b7510fb9cb1e74a60c5ac02b61f087d5
Parents: 700c0f4
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Mon Apr 30 10:35:05 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Apr 30 10:35:05 2018 +0200
----------------------------------------------------------------------
setup.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0ff434a9/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 1842a14..cc2d4f3 100644
--- a/setup.py
+++ b/setup.py
@@ -228,7 +228,7 @@ def do_setup():
'configparser>=3.5.0, <3.6.0',
'croniter>=0.3.17, <0.4',
'dill>=0.2.2, <0.3',
- 'flask==0.12.2',
+ 'flask>=0.12, <0.13',
'flask-appbuilder>=1.9.6, <2.0.0',
'flask-admin==1.4.1',
'flask-caching>=1.3.3, <1.4.0',
[27/50] incubator-airflow git commit: [AIRFLOW-2363] Fix return type
bug in TaskHandler
Posted by fo...@apache.org.
[AIRFLOW-2363] Fix return type bug in TaskHandler
Closes #3259 from
yrqls21/kevin_yang_fix_s3_logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/19b39012
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/19b39012
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/19b39012
Branch: refs/heads/v1-10-test
Commit: 19b3901284b9f7a9f9a898dd1a1e823e5109cfa1
Parents: 0ff434a
Author: Kevin Yang <ke...@airbnb.com>
Authored: Mon Apr 30 12:49:06 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Apr 30 12:49:20 2018 +0200
----------------------------------------------------------------------
airflow/bin/cli.py | 1 +
airflow/utils/log/gcs_task_handler.py | 11 ++++++-----
airflow/utils/log/s3_task_handler.py | 9 ++++-----
airflow/utils/log/wasb_task_handler.py | 9 ++++-----
tests/utils/log/test_s3_task_handler.py | 12 ++++++++++--
5 files changed, 25 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 8a92cfa..f26cbe4 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -468,6 +468,7 @@ def run(args, dag=None):
else:
with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
_run(args, dag, ti)
+ logging.shutdown()
@cli_utils.action_logging
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index d4a9871..8c34792 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -113,13 +113,14 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
remote_log = self.gcs_read(remote_loc)
log = '*** Reading remote log from {}.\n{}\n'.format(
remote_loc, remote_log)
+ return log, {'end_of_log': True}
except Exception as e:
log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(
remote_loc, str(e))
self.log.error(log)
- log += super(GCSTaskHandler, self)._read(ti, try_number)
-
- return log, {'end_of_log': True}
+ local_log, metadata = super(GCSTaskHandler, self)._read(ti, try_number)
+ log += local_log
+ return log, metadata
def gcs_read(self, remote_log_location):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index f29a92f..07b9b3e 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -111,10 +111,9 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
remote_log = self.s3_read(remote_loc, return_error=True)
log = '*** Reading remote log from {}.\n{}\n'.format(
remote_loc, remote_log)
+ return log, {'end_of_log': True}
else:
- log = super(S3TaskHandler, self)._read(ti, try_number)
-
- return log, {'end_of_log': True}
+ return super(S3TaskHandler, self)._read(ti, try_number)
def s3_log_exists(self, remote_log_location):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/utils/log/wasb_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/wasb_task_handler.py b/airflow/utils/log/wasb_task_handler.py
index 1a9590d..a2a0c0d 100644
--- a/airflow/utils/log/wasb_task_handler.py
+++ b/airflow/utils/log/wasb_task_handler.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -119,10 +119,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
remote_log = self.wasb_read(remote_loc, return_error=True)
log = '*** Reading remote log from {}.\n{}\n'.format(
remote_loc, remote_log)
+ return log, {'end_of_log': True}
else:
- log = super(WasbTaskHandler, self)._read(ti, try_number)
-
- return log, {'end_of_log': True}
+ return super(WasbTaskHandler, self)._read(ti, try_number)
def wasb_log_exists(self, remote_log_location):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py
index c287fbc..a5d5f15 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -117,6 +117,14 @@ class TestS3TaskHandler(unittest.TestCase):
'Log line\n\n'], [{'end_of_log': True}])
)
+ def test_read_when_s3_log_missing(self):
+ log, metadata = self.s3_task_handler.read(self.ti)
+
+ self.assertEqual(1, len(log))
+ self.assertEqual(len(log), len(metadata))
+ self.assertIn('*** Log file does not exist:', log[0])
+ self.assertEqual({'end_of_log': True}, metadata[0])
+
def test_read_raises_return_error(self):
handler = self.s3_task_handler
url = 's3://nonexistentbucket/foo'
[40/50] incubator-airflow git commit: [AIRFLOW-2410][AIRFLOW-75] Set
the timezone in the RBAC Web UI
Posted by fo...@apache.org.
[AIRFLOW-2410][AIRFLOW-75] Set the timezone in the RBAC Web UI
SqlAlchemy does not know how to handle the
timestamp since it isnt
timezone aware
Closes #3303 from Fokko/AIRFLOW-2410
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/71954a52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/71954a52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/71954a52
Branch: refs/heads/v1-10-test
Commit: 71954a52fc13accf1130d3d2a00263d7ec369b02
Parents: 12ab796
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Wed May 2 22:49:39 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed May 2 22:49:39 2018 +0200
----------------------------------------------------------------------
airflow/utils/timezone.py | 19 +++++++++++++++++--
airflow/www_rbac/views.py | 12 ++++++------
2 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/71954a52/airflow/utils/timezone.py
----------------------------------------------------------------------
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index af848df..6d49fbc 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -64,6 +64,21 @@ def utcnow():
return d
+def utc_epoch():
+ """
+ Gets the epoch in the users timezone
+ :return:
+ """
+
+ # pendulum utcnow() is not used as that sets a TimezoneInfo object
+ # instead of a Timezone. This is not pickable and also creates issues
+ # when using replace()
+ d = dt.datetime(1970, 1, 1)
+ d = d.replace(tzinfo=utc)
+
+ return d
+
+
def convert_to_utc(value):
"""
Returns the datetime with the default timezone added if timezone
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/71954a52/airflow/www_rbac/views.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 9636a58..7c20a65 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -880,7 +880,7 @@ class Airflow(AirflowBaseView):
base_date = dag.latest_execution_date or timezone.utcnow()
dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else datetime(2000, 1, 1)
+ min_date = dates[0] if dates else timezone.utc_epoch()
DR = models.DagRun
dag_runs = (
@@ -1113,7 +1113,7 @@ class Airflow(AirflowBaseView):
base_date = dag.latest_execution_date or timezone.utcnow()
dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else datetime(2000, 1, 1)
+ min_date = dates[0] if dates else timezone.utc_epoch()
root = request.args.get('root')
if root:
@@ -1216,7 +1216,7 @@ class Airflow(AirflowBaseView):
base_date = dag.latest_execution_date or timezone.utcnow()
dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else datetime(2000, 1, 1)
+ min_date = dates[0] if dates else timezone.utc_epoch()
root = request.args.get('root')
if root:
@@ -1279,7 +1279,7 @@ class Airflow(AirflowBaseView):
base_date = dag.latest_execution_date or timezone.utcnow()
dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else datetime(2000, 1, 1)
+ min_date = dates[0] if dates else timezone.utc_epoch()
root = request.args.get('root')
if root:
[08/50] incubator-airflow git commit: [AIRFLOW-2348] Strip path
prefix from the destination_object when source_object contains a wildcard[]
Posted by fo...@apache.org.
[AIRFLOW-2348] Strip path prefix from the destination_object when source_object contains a wildcard[]
Closes #3247 from csoulios/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/840930b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/840930b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/840930b4
Branch: refs/heads/v1-10-test
Commit: 840930b4d0c545adc24d8eb040974f88370e081b
Parents: 3368f42
Author: Christos Soulios <cs...@gmail.com>
Authored: Sat Apr 28 20:44:53 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sat Apr 28 20:44:53 2018 +0200
----------------------------------------------------------------------
airflow/contrib/operators/gcs_to_gcs.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/840930b4/airflow/contrib/operators/gcs_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_gcs.py b/airflow/contrib/operators/gcs_to_gcs.py
index 6acc517..9bcf9d4 100644
--- a/airflow/contrib/operators/gcs_to_gcs.py
+++ b/airflow/contrib/operators/gcs_to_gcs.py
@@ -99,7 +99,7 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
for source_object in objects:
if self.destination_object:
destination_object = "{}/{}".format(self.destination_object,
- source_object)
+ source_object[wildcard_position:])
else:
destination_object = source_object
self.log.info('Executing copy of gs://{0}/{1} to '
[17/50] incubator-airflow git commit: [AIRFLOW-1313] Add
vertica_to_mysql operator
Posted by fo...@apache.org.
[AIRFLOW-1313] Add vertica_to_mysql operator
Closes #2370 from juise/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3aa8e31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3aa8e31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3aa8e31
Branch: refs/heads/v1-10-test
Commit: c3aa8e31fae4edeff8abe6d38e573ef16583496c
Parents: c5d3576
Author: Alexander Petrovsky <as...@gmail.com>
Authored: Sun Apr 29 00:05:23 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 00:05:23 2018 -0700
----------------------------------------------------------------------
airflow/contrib/operators/vertica_to_mysql.py | 142 +++++++++++++++++++
scripts/ci/requirements.txt | 1 +
.../contrib/operators/test_vertica_to_mysql.py | 87 ++++++++++++
3 files changed, 230 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3aa8e31/airflow/contrib/operators/vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_mysql.py b/airflow/contrib/operators/vertica_to_mysql.py
new file mode 100644
index 0000000..9b14e6c
--- /dev/null
+++ b/airflow/contrib/operators/vertica_to_mysql.py
@@ -0,0 +1,142 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.vertica_hook import VerticaHook
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+from contextlib import closing
+
+import unicodecsv as csv
+from tempfile import NamedTemporaryFile
+
+
+class VerticaToMySqlTransfer(BaseOperator):
+ """
+ Moves data from Vertica to MySQL.
+
+ :param sql: SQL query to execute against the Vertica database
+ :type sql: str
+ :param vertica_conn_id: source Vertica connection
+ :type vertica_conn_id: str
+ :param mysql_table: target MySQL table, use dot notation to target a
+ specific database
+ :type mysql_table: str
+ :param mysql_conn_id: source mysql connection
+ :type mysql_conn_id: str
+ :param mysql_preoperator: sql statement to run against MySQL prior to
+ import, typically use to truncate of delete in place of the data
+ coming in, allowing the task to be idempotent (running the task
+ twice won't double load data)
+ :type mysql_preoperator: str
+ :param mysql_postoperator: sql statement to run against MySQL after the
+ import, typically used to move data from staging to production
+ and issue cleanup commands.
+ :type mysql_postoperator: str
+ :param bulk_load: flag to use bulk_load option. This loads MySQL directly
+ from a tab-delimited text file using the LOAD DATA LOCAL INFILE command.
+ This option requires an extra connection parameter for the
+ destination MySQL connection: {'local_infile': true}.
+ :type bulk_load: bool
+ """
+
+ template_fields = ('sql', 'mysql_table', 'mysql_preoperator',
+ 'mysql_postoperator')
+ template_ext = ('.sql',)
+ ui_color = '#a0e08c'
+
+ @apply_defaults
+ def __init__(
+ self,
+ sql,
+ mysql_table,
+ vertica_conn_id='vertica_default',
+ mysql_conn_id='mysql_default',
+ mysql_preoperator=None,
+ mysql_postoperator=None,
+ bulk_load=False,
+ *args, **kwargs):
+ super(VerticaToMySqlTransfer, self).__init__(*args, **kwargs)
+ self.sql = sql
+ self.mysql_table = mysql_table
+ self.mysql_conn_id = mysql_conn_id
+ self.mysql_preoperator = mysql_preoperator
+ self.mysql_postoperator = mysql_postoperator
+ self.vertica_conn_id = vertica_conn_id
+ self.bulk_load = bulk_load
+
+ def execute(self, context):
+ vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
+ mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+
+ tmpfile = None
+ result = None
+
+ selected_columns = []
+
+ count = 0
+ with closing(vertica.get_conn()) as conn:
+ with closing(conn.cursor()) as cursor:
+ cursor.execute(self.sql)
+ selected_columns = [d.name for d in cursor.description]
+
+ if self.bulk_load:
+ tmpfile = NamedTemporaryFile("w")
+
+ logging.info("Selecting rows from Vertica to local file " + str(tmpfile.name) + "...")
+ logging.info(self.sql)
+
+ csv_writer = csv.writer(tmpfile, delimiter='\t', encoding='utf-8')
+ for row in cursor.iterate():
+ csv_writer.writerow(row)
+ count += 1
+
+ tmpfile.flush()
+ else:
+ logging.info("Selecting rows from Vertica...")
+ logging.info(self.sql)
+
+ result = cursor.fetchall()
+ count = len(result)
+
+ logging.info("Selected rows from Vertica " + str(count))
+
+ if self.mysql_preoperator:
+ logging.info("Running MySQL preoperator...")
+ mysql.run(self.mysql_preoperator)
+
+ try:
+ if self.bulk_load:
+ logging.info("Bulk inserting rows into MySQL...")
+ with closing(mysql.get_conn()) as conn:
+ with closing(conn.cursor()) as cursor:
+ cursor.execute("LOAD DATA LOCAL INFILE '%s' INTO TABLE %s LINES TERMINATED BY '\r\n' (%s)" % (tmpfile.name, self.mysql_table, ", ".join(selected_columns)))
+ conn.commit()
+ tmpfile.close()
+ else:
+ logging.info("Inserting rows into MySQL...")
+ mysql.insert_rows(table=self.mysql_table, rows=result, target_fields=selected_columns)
+ logging.info("Inserted rows into MySQL " + str(count))
+ except:
+ logging.error("Inserted rows into MySQL 0")
+ raise
+
+ if self.mysql_postoperator:
+ logging.info("Running MySQL postoperator...")
+ mysql.run(self.mysql_postoperator)
+
+ logging.info("Done")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3aa8e31/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index bba5d29..a2a78df 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -92,5 +92,6 @@ statsd
thrift
thrift_sasl
unicodecsv
+vertica_python
zdesk
kubernetes
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3aa8e31/tests/contrib/operators/test_vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_vertica_to_mysql.py b/tests/contrib/operators/test_vertica_to_mysql.py
new file mode 100644
index 0000000..5c05e3d
--- /dev/null
+++ b/tests/contrib/operators/test_vertica_to_mysql.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+
+import mock
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.vertica_to_mysql import VerticaToMySqlTransfer
+
+
+def mock_get_conn():
+ commit_mock = mock.MagicMock(
+ )
+ cursor_mock = mock.MagicMock(
+ execute = [],
+ fetchall = [['1', '2', '3']],
+ description = ['a', 'b', 'c'],
+ iterate = [['1', '2', '3']],
+ )
+ conn_mock = mock.MagicMock(
+ commit = commit_mock,
+ cursor = cursor_mock,
+ )
+ return conn_mock
+
+
+class TestVerticaToMySqlTransfer(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': datetime.datetime(2017, 1, 1)
+ }
+ self.dag = DAG('test_dag_id', default_args=args)
+
+ @mock.patch('airflow.contrib.operators.vertica_to_mysql.VerticaHook.get_conn', side_effect=mock_get_conn)
+ @mock.patch('airflow.contrib.operators.vertica_to_mysql.MySqlHook.get_conn', side_effect=mock_get_conn)
+ @mock.patch('airflow.contrib.operators.vertica_to_mysql.MySqlHook.insert_rows', return_value=True)
+ def test_select_insert_transfer(self, *args):
+ """
+ Test check selection from vertica into memory and
+ after that inserting into mysql
+ """
+ task = VerticaToMySqlTransfer(task_id='test_task_id',
+ sql='select a, b, c',
+ mysql_table='test_table',
+ vertica_conn_id='test_vertica_conn_id',
+ mysql_conn_id='test_mysql_conn_id',
+ params={},
+ bulk_load=False,
+ dag=self.dag)
+ task.execute(None)
+
+ @mock.patch('airflow.contrib.operators.vertica_to_mysql.VerticaHook.get_conn', side_effect=mock_get_conn)
+ @mock.patch('airflow.contrib.operators.vertica_to_mysql.MySqlHook.get_conn', side_effect=mock_get_conn)
+ def test_select_bulk_insert_transfer(self, *args):
+ """
+ Test check selection from vertica into temporary file and
+ after that bulk inserting into mysql
+ """
+ task = VerticaToMySqlTransfer(task_id='test_task_id',
+ sql='select a, b, c',
+ mysql_table='test_table',
+ vertica_conn_id='test_vertica_conn_id',
+ mysql_conn_id='test_mysql_conn_id',
+ params={},
+ bulk_load=True,
+ dag=self.dag)
+ task.execute(None)
+
+
+if __name__ == '__main__':
+ unittest.main()
[22/50] incubator-airflow git commit: closes
apache/incubator-airflow#2047 *Closed for inactivity*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2047 *Closed for inactivity*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/06e90f49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/06e90f49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/06e90f49
Branch: refs/heads/v1-10-test
Commit: 06e90f49615896ce5c0f720f0e760c10762e8f4c
Parents: 8c8d140
Author: r39132 <si...@yahoo.com>
Authored: Sun Apr 29 20:18:18 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 20:18:18 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[37/50] incubator-airflow git commit: [AIRFLOW-2404] Add additional
documentation for unqueued task
Posted by fo...@apache.org.
[AIRFLOW-2404] Add additional documentation for unqueued task
Closes #3286 from AetherUnbound/feature/task-not-
queued-doc
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9a0c4e6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9a0c4e6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9a0c4e6c
Branch: refs/heads/v1-10-test
Commit: 9a0c4e6caeaddde19c2e104d959bdb36b46ab37a
Parents: 8c49e8d
Author: Matthew Bowden <bo...@spu.edu>
Authored: Wed May 2 08:12:49 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed May 2 08:12:49 2018 +0200
----------------------------------------------------------------------
airflow/www/views.py | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9a0c4e6c/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index ff24031..6e2f1fc 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -858,8 +858,16 @@ class Airflow(BaseView):
no_failed_deps_result = [(
"Unknown",
dedent("""\
- All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:<br/>
+ All dependencies are met but the task instance is not running.
+ In most cases this just means that the task will probably
+ be scheduled soon unless:<br/>
- The scheduler is down or under heavy load<br/>
+ - The following configuration values may be limiting the number
+ of queueable processes:
+ <code>parallelism</code>,
+ <code>dag_concurrency</code>,
+ <code>max_active_dag_runs_per_dag</code>,
+ <code>non_pooled_task_slot_count</code><br/>
{}
<br/>
If this task instance does not start soon please contact your Airflow """
[18/50] incubator-airflow git commit: [AIRFLOW-1960] Add support for
secrets in kubernetes operator
Posted by fo...@apache.org.
[AIRFLOW-1960] Add support for secrets in kubernetes operator
Closes #3271 from ese/secrets-kubernetes-operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/72f15a10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/72f15a10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/72f15a10
Branch: refs/heads/v1-10-test
Commit: 72f15a108e5556970229cb68d3b0968ee18db46e
Parents: c3aa8e3
Author: Sergio Ballesteros <sn...@locolandia.net>
Authored: Sun Apr 29 11:54:55 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sun Apr 29 11:54:55 2018 +0200
----------------------------------------------------------------------
.../kubernetes_request_factory.py | 11 +++++--
airflow/contrib/kubernetes/secret.py | 5 +++-
.../operators/kubernetes_pod_operator.py | 31 ++++++++++++++++++--
3 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 6e8632f..12d05ec 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -84,7 +84,10 @@ class KubernetesRequestFactory:
@staticmethod
def attach_volumes(pod, req):
- req['spec']['volumes'] = pod.volumes
+ req['spec']['volumes'] = (
+ req['spec'].get('volumes', []))
+ if len(pod.volumes) > 0:
+ req['spec']['volumes'].extend(pod.volumes)
@staticmethod
def attach_volume_mounts(pod, req):
@@ -101,8 +104,10 @@ class KubernetesRequestFactory:
def extract_volume_secrets(pod, req):
vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
if any(vol_secrets):
- req['spec']['containers'][0]['volumeMounts'] = []
- req['spec']['volumes'] = []
+ req['spec']['containers'][0]['volumeMounts'] = (
+ req['spec']['containers'][0].get('volumeMounts', []))
+ req['spec']['volumes'] = (
+ req['spec'].get('volumes', []))
for idx, vol in enumerate(vol_secrets):
vol_id = 'secretvol' + str(idx)
req['spec']['containers'][0]['volumeMounts'].append({
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
index 23bfacc..5c1038c 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -25,7 +25,8 @@ class Secret:
:param deploy_type: The type of secret deploy in Kubernetes, either `env` or
`volume`
:type deploy_type: ``str``
- :param deploy_target: The environment variable to be created in the worker.
+ :param deploy_target: The environment variable when `deploy_type` `env` or
+ file path when `deploy_type` `volume` where expose secret
:type deploy_target: ``str``
:param secret: Name of the secrets object in Kubernetes
:type secret: ``str``
@@ -34,5 +35,7 @@ class Secret:
"""
self.deploy_type = deploy_type
self.deploy_target = deploy_target.upper()
+ if deploy_type == 'volume':
+ self.deploy_target = deploy_target
self.secret = secret
self.key = key
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index bcc3cad..9e95d8b 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -27,6 +27,31 @@ ui_color = '#ffefeb'
class KubernetesPodOperator(BaseOperator):
+ """
+ Execute a task in a Kubernetes Pod
+
+ :param image: Docker image name
+ :type image: str
+ :param: namespace: namespace name where run the Pod
+ :type: namespace: str
+ :param cmds: entrypoint of the container
+ :type cmds: list
+ :param arguments: arguments of to the entrypoint.
+ The docker image's CMD is used if this is not provided.
+ :type arguments: list
+ :param labels: labels to apply to the Pod
+ :type labels: dict
+ :param startup_timeout_seconds: timeout in seconds to startup the pod
+ :type startup_timeout_seconds: int
+ :param name: name for the pod
+ :type name: str
+ :param secrets: Secrets to attach to the container
+ :type secrets: list
+ :param in_cluster: run kubernetes client with in_cluster configuration
+ :type in_cluster: bool
+ :param get_logs: get the stdout of the container as logs of the tasks
+ """
+
def execute(self, context):
try:
@@ -42,6 +67,8 @@ class KubernetesPodOperator(BaseOperator):
labels=self.labels
)
+ pod.secrets = self.secrets
+
launcher = pod_launcher.PodLauncher(client)
final_state = launcher.run_pod(
pod,
@@ -59,15 +86,14 @@ class KubernetesPodOperator(BaseOperator):
cmds,
arguments,
name,
+ secrets=None,
in_cluster=False,
labels=None,
startup_timeout_seconds=120,
- kube_executor_config=None,
get_logs=True,
*args,
**kwargs):
super(KubernetesPodOperator, self).__init__(*args, **kwargs)
- self.kube_executor_config = kube_executor_config or {}
self.image = image
self.namespace = namespace
self.cmds = cmds
@@ -75,5 +101,6 @@ class KubernetesPodOperator(BaseOperator):
self.labels = labels or {}
self.startup_timeout_seconds = startup_timeout_seconds
self.name = name
+ self.secrets = secrets or []
self.in_cluster = in_cluster
self.get_logs = get_logs
[24/50] incubator-airflow git commit: [AIRFLOW-2390] Resolve
FlaskWTFDeprecationWarning
Posted by fo...@apache.org.
[AIRFLOW-2390] Resolve FlaskWTFDeprecationWarning
- Replace soon to be deprecated `flask_wtf.Form`
class with `from flask_wtf import FlaskForm`
Closes #3278 from kaxil/AIRFLOW-2390
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ae48fce9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ae48fce9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ae48fce9
Branch: refs/heads/v1-10-test
Commit: ae48fce9b097e48f09b3b8829b3b80dea21c58e9
Parents: e29562e
Author: Kaxil Naik <ka...@gmail.com>
Authored: Mon Apr 30 08:35:54 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Apr 30 08:35:54 2018 +0200
----------------------------------------------------------------------
airflow/www/forms.py | 10 +++++-----
airflow/www_rbac/forms.py | 10 +++++-----
2 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae48fce9/airflow/www/forms.py
----------------------------------------------------------------------
diff --git a/airflow/www/forms.py b/airflow/www/forms.py
index 1904e72..d6fbe13 100644
--- a/airflow/www/forms.py
+++ b/airflow/www/forms.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,16 +25,16 @@ from __future__ import unicode_literals
from airflow.utils import timezone
from flask_admin.form import DateTimePickerWidget
from wtforms import DateTimeField, SelectField
-from flask_wtf import Form
+from flask_wtf import FlaskForm
-class DateTimeForm(Form):
+class DateTimeForm(FlaskForm):
# Date filter form needed for gantt and graph view
execution_date = DateTimeField(
"Execution date", widget=DateTimePickerWidget())
-class DateTimeWithNumRunsForm(Form):
+class DateTimeWithNumRunsForm(FlaskForm):
# Date time and number of runs form for tree view, task duration
# and landing times
base_date = DateTimeField(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae48fce9/airflow/www_rbac/forms.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/forms.py b/airflow/www_rbac/forms.py
index 63d27e6..61a7ce2 100644
--- a/airflow/www_rbac/forms.py
+++ b/airflow/www_rbac/forms.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,20 +30,20 @@ from flask_appbuilder.fieldwidgets import (BS3TextFieldWidget, BS3TextAreaFieldW
BS3PasswordFieldWidget, Select2Widget,
DateTimePickerWidget)
from flask_babel import lazy_gettext
-from flask_wtf import Form
+from flask_wtf import FlaskForm
from wtforms import validators
from wtforms.fields import (IntegerField, SelectField, TextAreaField, PasswordField,
StringField, DateTimeField, BooleanField)
-class DateTimeForm(Form):
+class DateTimeForm(FlaskForm):
# Date filter form needed for gantt and graph view
execution_date = DateTimeField(
"Execution date", widget=DateTimePickerWidget())
-class DateTimeWithNumRunsForm(Form):
+class DateTimeWithNumRunsForm(FlaskForm):
# Date time and number of runs form for tree view, task duration
# and landing times
base_date = DateTimeField(
[36/50] incubator-airflow git commit: closes
apache/incubator-airflow#2337 *No longer a bug*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2337 *No longer a bug*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8c49e8d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8c49e8d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8c49e8d9
Branch: refs/heads/v1-10-test
Commit: 8c49e8d9bcd905dcd7a40d467bac94a687d85d75
Parents: fef7d30
Author: r39132 <si...@yahoo.com>
Authored: Tue May 1 22:41:32 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Tue May 1 22:41:32 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[48/50] incubator-airflow git commit: [AIRFLOW-1812] Update logging
example
Posted by fo...@apache.org.
[AIRFLOW-1812] Update logging example
The logging has changed, therefore we should also
update the
updating.md guide
Closes #2784 from Fokko/AIRFLOW-1812-update-
logging-example
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/97ab9e76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/97ab9e76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/97ab9e76
Branch: refs/heads/v1-10-test
Commit: 97ab9e762c1f325317fd8003e2846e931137a1e6
Parents: 3a28ceb
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Thu May 3 22:58:44 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu May 3 22:58:44 2018 -0700
----------------------------------------------------------------------
UPDATING.md | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 112 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/97ab9e76/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 8006876..defd95b 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -61,9 +61,9 @@ Dataflow job labeling is now supported in Dataflow{Java,Python}Operator with a d
to 2.2.0 or greater.
### Redshift to S3 Operator
-With Airflow 1.9 or lower, Unload operation always included header row. In order to include header row,
+With Airflow 1.9 or lower, Unload operation always included header row. In order to include header row,
we need to turn off parallel unload. It is preferred to perform unload operation using all nodes so that it is
-faster for larger tables. So, parameter called `include_header` is added and default is set to False.
+faster for larger tables. So, parameter called `include_header` is added and default is set to False.
Header row will be added only if this parameter is set True and also in that case parallel will be automatically turned off (`PARALLEL OFF`)
### Google cloud connection string
@@ -119,7 +119,116 @@ logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
The logging configuration file needs to be on the `PYTHONPATH`, for example `$AIRFLOW_HOME/config`. This directory is loaded by default. Any directory may be added to the `PYTHONPATH`, this might be handy when the config is in another directory or a volume is mounted in case of Docker.
-The config can be taken from `airflow/config_templates/airflow_local_settings.py` as a starting point. Copy the contents to `${AIRFLOW_HOME}/config/airflow_local_settings.py`, and alter the config as is preferred.
+The config can be taken from `airflow/config_templates/airflow_local_settings.py` as a starting point. Copy the contents to `${AIRFLOW_HOME}/config/airflow_local_settings.py`, and alter the config as is preferred.
+
+```
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from airflow import configuration as conf
+
+# TODO: Logging format and level should be configured
+# in this file instead of from airflow.cfg. Currently
+# there are other log format and level configurations in
+# settings.py and cli.py. Please see AIRFLOW-1455.
+
+LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
+LOG_FORMAT = conf.get('core', 'log_format')
+
+BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
+PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
+
+FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
+PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
+
+DEFAULT_LOGGING_CONFIG = {
+ 'version': 1,
+ 'disable_existing_loggers': False,
+ 'formatters': {
+ 'airflow.task': {
+ 'format': LOG_FORMAT,
+ },
+ 'airflow.processor': {
+ 'format': LOG_FORMAT,
+ },
+ },
+ 'handlers': {
+ 'console': {
+ 'class': 'logging.StreamHandler',
+ 'formatter': 'airflow.task',
+ 'stream': 'ext://sys.stdout'
+ },
+ 'file.task': {
+ 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
+ 'formatter': 'airflow.task',
+ 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+ 'filename_template': FILENAME_TEMPLATE,
+ },
+ 'file.processor': {
+ 'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
+ 'formatter': 'airflow.processor',
+ 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
+ 'filename_template': PROCESSOR_FILENAME_TEMPLATE,
+ }
+ # When using s3 or gcs, provide a customized LOGGING_CONFIG
+ # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
+ # for details
+ # 's3.task': {
+ # 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
+ # 'formatter': 'airflow.task',
+ # 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+ # 's3_log_folder': S3_LOG_FOLDER,
+ # 'filename_template': FILENAME_TEMPLATE,
+ # },
+ # 'gcs.task': {
+ # 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
+ # 'formatter': 'airflow.task',
+ # 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+ # 'gcs_log_folder': GCS_LOG_FOLDER,
+ # 'filename_template': FILENAME_TEMPLATE,
+ # },
+ },
+ 'loggers': {
+ '': {
+ 'handlers': ['console'],
+ 'level': LOG_LEVEL
+ },
+ 'airflow': {
+ 'handlers': ['console'],
+ 'level': LOG_LEVEL,
+ 'propagate': False,
+ },
+ 'airflow.processor': {
+ 'handlers': ['file.processor'],
+ 'level': LOG_LEVEL,
+ 'propagate': True,
+ },
+ 'airflow.task': {
+ 'handlers': ['file.task'],
+ 'level': LOG_LEVEL,
+ 'propagate': False,
+ },
+ 'airflow.task_runner': {
+ 'handlers': ['file.task'],
+ 'level': LOG_LEVEL,
+ 'propagate': True,
+ },
+ }
+}
+```
To customize the logging (for example, use logging rotate), define one or more of the logging handles that [Python has to offer](https://docs.python.org/3/library/logging.handlers.html). For more details about the Python logging, please refer to the [official logging documentation](https://docs.python.org/3/library/logging.html).
[13/50] incubator-airflow git commit: closes
apache/incubator-airflow#2744 *Closed for inactivity.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2744 *Closed for inactivity.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f67e9673
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f67e9673
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f67e9673
Branch: refs/heads/v1-10-test
Commit: f67e9673f5ced64aa3681cf9343a4b8b73f7903b
Parents: 2d588e9
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 23:30:32 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:30:32 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[29/50] incubator-airflow git commit: [AIRFLOW-1313] Fix license
header
Posted by fo...@apache.org.
[AIRFLOW-1313] Fix license header
Closes #3281 from juise/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a7e8f489
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a7e8f489
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a7e8f489
Branch: refs/heads/v1-10-test
Commit: a7e8f48963e38ed233a84b1439715fdbcb9ec619
Parents: 9a8e4f7
Author: Alexander Petrovsky <as...@gmail.com>
Authored: Mon Apr 30 09:11:22 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 09:11:22 2018 -0700
----------------------------------------------------------------------
airflow/contrib/operators/vertica_to_mysql.py | 23 +++++++++++--------
.../contrib/operators/test_vertica_to_mysql.py | 24 ++++++++++++--------
2 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7e8f489/airflow/contrib/operators/vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_mysql.py b/airflow/contrib/operators/vertica_to_mysql.py
index 9b14e6c..2e9325d 100644
--- a/airflow/contrib/operators/vertica_to_mysql.py
+++ b/airflow/contrib/operators/vertica_to_mysql.py
@@ -1,16 +1,21 @@
# -*- coding: utf-8 -*-
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
import logging
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7e8f489/tests/contrib/operators/test_vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_vertica_to_mysql.py b/tests/contrib/operators/test_vertica_to_mysql.py
index 5c05e3d..615f111 100644
--- a/tests/contrib/operators/test_vertica_to_mysql.py
+++ b/tests/contrib/operators/test_vertica_to_mysql.py
@@ -1,17 +1,21 @@
# -*- coding: utf-8 -*-
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# http://www.apache.org/licenses/LICENSE-2.0
#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
import datetime
[38/50] incubator-airflow git commit: [AIRFLOW-2406] Add Apache2
License Shield to Readme
Posted by fo...@apache.org.
[AIRFLOW-2406] Add Apache2 License Shield to Readme
Closes #3290 from
r39132/add_apache2_license_shield_to_readme
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/96d00da5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/96d00da5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/96d00da5
Branch: refs/heads/v1-10-test
Commit: 96d00da5b038d12b01aa538f0f49722435bedbac
Parents: 9a0c4e6
Author: r39132 <si...@yahoo.com>
Authored: Tue May 1 23:36:59 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Tue May 1 23:36:59 2018 -0700
----------------------------------------------------------------------
README.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/96d00da5/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 86b4a8c..bdbfbf2 100644
--- a/README.md
+++ b/README.md
@@ -4,6 +4,7 @@
[![Build Status](https://travis-ci.org/apache/incubator-airflow.svg?branch=master)](https://travis-ci.org/apache/incubator-airflow)
[![Coverage Status](https://img.shields.io/codecov/c/github/apache/incubator-airflow/master.svg)](https://codecov.io/github/apache/incubator-airflow?branch=master)
[![Documentation Status](https://readthedocs.org/projects/airflow/badge/?version=latest)](https://airflow.readthedocs.io/en/latest/?badge=latest)
+[![License](http://img.shields.io/:license-Apache%202-blue.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
[![Join the chat at https://gitter.im/apache/incubator-airflow](https://badges.gitter.im/apache/incubator-airflow.svg)](https://gitter.im/apache/incubator-airflow?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
_NOTE: The transition from 1.8.0 (or before) to 1.8.1 (or after) requires uninstalling Airflow before installing the new version. The package name was changed from `airflow` to `apache-airflow` as of version 1.8.1._
[03/50] incubator-airflow git commit: [AIRFLOW-2381] Fix the flaky
ApiPasswordTests test
Posted by fo...@apache.org.
[AIRFLOW-2381] Fix the flaky ApiPasswordTests test
This test is in conflict with different tests
running in parallel
By calling a simple overview page, the behaviour
of checking the
password is still checked, but isn't dependent on
a specific dag
being present in the database
Closes #3269 from Fokko/AIRFLOW-2381
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/414a08e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/414a08e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/414a08e5
Branch: refs/heads/v1-10-test
Commit: 414a08e505e2ae86f4272bfc8b0d28d66ef6ab6a
Parents: 801fe7d
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Fri Apr 27 16:43:40 2018 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Apr 27 16:43:40 2018 +0200
----------------------------------------------------------------------
.../api/experimental/test_password_endpoints.py | 19 +++++--------------
1 file changed, 5 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/414a08e5/tests/www/api/experimental/test_password_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/test_password_endpoints.py b/tests/www/api/experimental/test_password_endpoints.py
index ecddff1..8131ff5 100644
--- a/tests/www/api/experimental/test_password_endpoints.py
+++ b/tests/www/api/experimental/test_password_endpoints.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -56,24 +56,15 @@ class ApiPasswordTests(unittest.TestCase):
def test_authorized(self):
with self.app.test_client() as c:
- url_template = '/api/experimental/dags/{}/dag_runs'
- response = c.post(
- url_template.format('example_bash_operator'),
- data=json.dumps(dict(run_id='my_run' + datetime.now().isoformat())),
- content_type="application/json",
+ response = c.get(
+ '/api/experimental/pools',
headers={'Authorization': 'Basic aGVsbG86d29ybGQ='} # hello:world
)
self.assertEqual(200, response.status_code)
def test_unauthorized(self):
with self.app.test_client() as c:
- url_template = '/api/experimental/dags/{}/dag_runs'
- response = c.post(
- url_template.format('example_bash_operator'),
- data=json.dumps(dict(run_id='my_run' + datetime.now().isoformat())),
- content_type="application/json"
- )
-
+ response = c.get('/api/experimental/pools')
self.assertEqual(401, response.status_code)
def tearDown(self):
[15/50] incubator-airflow git commit: closes
apache/incubator-airflow#2555 *Fixed by another PR.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2555 *Fixed by another PR.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b34c2963
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b34c2963
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b34c2963
Branch: refs/heads/v1-10-test
Commit: b34c29635065a6acee14851758aecd4b4f4e9dfd
Parents: 99cf26d
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 23:35:19 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:35:19 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[16/50] incubator-airflow git commit: closes
apache/incubator-airflow#3209 *PR in heavy need of squashing and cleanup.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#3209 *PR in heavy need of squashing and cleanup.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c5d35761
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c5d35761
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c5d35761
Branch: refs/heads/v1-10-test
Commit: c5d35761f4768f434527b3a26b101b8d87f7965f
Parents: b34c296
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 23:54:19 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:54:19 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[11/50] incubator-airflow git commit: closes
apache/incubator-airflow#3276 *Messed up PR - hundreds of old commits.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#3276 *Messed up PR - hundreds of old commits.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e691acc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e691acc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e691acc1
Branch: refs/heads/v1-10-test
Commit: e691acc115683f1fc6881e6dcdbc9fdedffde667
Parents: e9b74b6
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 14:57:27 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 14:57:27 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[09/50] incubator-airflow git commit: [AIRFLOW-2370] Implement
--use_random_password in create_user
Posted by fo...@apache.org.
[AIRFLOW-2370] Implement --use_random_password in create_user
Closes #3262 from wrp/passwords
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5aa15868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5aa15868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5aa15868
Branch: refs/heads/v1-10-test
Commit: 5aa15868fd9276d82df33f8fe2e4e3cfc7f4d60a
Parents: 840930b
Author: William Pursell <wi...@wepay.com>
Authored: Sat Apr 28 20:59:59 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sat Apr 28 20:59:59 2018 +0200
----------------------------------------------------------------------
airflow/bin/cli.py | 32 ++++++++++++++++++++------------
tests/core.py | 7 +++++++
2 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5aa15868/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 975f481..8a92cfa 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,9 +22,10 @@ from __future__ import print_function
import logging
import os
-import socket
import subprocess
import textwrap
+import random
+import string
from importlib import import_module
import daemon
@@ -1209,27 +1210,28 @@ def create_user(args):
}
empty_fields = [k for k, v in fields.items() if not v]
if empty_fields:
- print('Missing arguments: {}.'.format(', '.join(empty_fields)))
- sys.exit(0)
+ raise SystemExit('Required arguments are missing: {}.'.format(
+ ', '.join(empty_fields)))
appbuilder = cached_appbuilder()
role = appbuilder.sm.find_role(args.role)
if not role:
- print('{} is not a valid role.'.format(args.role))
- sys.exit(0)
+ raise SystemExit('{} is not a valid role.'.format(args.role))
- password = getpass.getpass('Password:')
- password_confirmation = getpass.getpass('Repeat for confirmation:')
- if password != password_confirmation:
- print('Passwords did not match!')
- sys.exit(0)
+ if args.use_random_password:
+ password = ''.join(random.choice(string.printable) for _ in range(16))
+ else:
+ password = getpass.getpass('Password:')
+ password_confirmation = getpass.getpass('Repeat for confirmation:')
+ if password != password_confirmation:
+ raise SystemExit('Passwords did not match!')
user = appbuilder.sm.add_user(args.username, args.firstname, args.lastname,
args.email, role, password)
if user:
print('{} user {} created.'.format(args.role, args.username))
else:
- print('Failed to create user.')
+ raise SystemExit('Failed to create user.')
Arg = namedtuple(
@@ -1619,6 +1621,11 @@ class CLIFactory(object):
('-u', '--username',),
help='Username of the user',
type=str),
+ 'use_random_password': Arg(
+ ('--use_random_password',),
+ help='Do not prompt for password. Use random string instead',
+ default=False,
+ action='store_true'),
}
subparsers = (
{
@@ -1759,7 +1766,8 @@ class CLIFactory(object):
}, {
'func': create_user,
'help': "Create an admin account",
- 'args': ('role', 'username', 'email', 'firstname', 'lastname'),
+ 'args': ('role', 'username', 'email', 'firstname', 'lastname',
+ 'use_random_password'),
},
)
subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5aa15868/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 9cbae9d..6d18ffe 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -983,6 +983,13 @@ class CliTests(unittest.TestCase):
args = self.parser.parse_args(['list_dags', '--report'])
cli.list_dags(args)
+ def test_cli_create_user(self):
+ args = self.parser.parse_args([
+ 'create_user', '-u', 'test', '-l', 'doe', '-f', 'jon',
+ '-e', 'jdoe@foo.com', '-r', 'Viewer', '--use_random_password'
+ ])
+ cli.create_user(args)
+
def test_cli_list_tasks(self):
for dag_id in self.dagbag.dags.keys():
args = self.parser.parse_args(['list_tasks', dag_id])
[14/50] incubator-airflow git commit: closes
apache/incubator-airflow#2555 *Fixed by another PR.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2555 *Fixed by another PR.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/99cf26df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/99cf26df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/99cf26df
Branch: refs/heads/v1-10-test
Commit: 99cf26dfea575f5bd2d0e2e764416259d34c8a41
Parents: f67e967
Author: r39132 <si...@yahoo.com>
Authored: Sat Apr 28 23:35:10 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:35:10 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[33/50] incubator-airflow git commit: [AIRFLOW-1853] Show only the
desired number of runs in tree view
Posted by fo...@apache.org.
[AIRFLOW-1853] Show only the desired number of runs in tree view
Previously, the "Number of runs" option was not
being respected for DAGs that were externally
triggered. Now, only the set number of runs is
shown regardless of DAG trigger type.
Also adjust www_rbac
Closes #3288 from
AetherUnbound/feature/AIRFLOW-1853
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d1b2aee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d1b2aee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d1b2aee
Branch: refs/heads/v1-10-test
Commit: 2d1b2aee91bc47cfe48a470e1618f026ece86a4d
Parents: a67c13e
Author: Matthew Bowden <bo...@spu.edu>
Authored: Tue May 1 22:03:24 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Tue May 1 22:03:24 2018 +0200
----------------------------------------------------------------------
airflow/www/views.py | 4 ++++
airflow/www_rbac/views.py | 4 ++++
2 files changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d1b2aee/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5dda036..ff24031 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1254,6 +1254,10 @@ class Airflow(BaseView):
dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
dates = sorted(list(dag_runs.keys()))
+ # Only show the desired number of runs regardless of the trigger method
+ if len(dates) > num_runs:
+ dates = dates[-num_runs:]
+
max_date = max(dates) if dates else None
tis = dag.get_task_instances(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d1b2aee/airflow/www_rbac/views.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index f064c14..9636a58 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -894,6 +894,10 @@ class Airflow(AirflowBaseView):
dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
dates = sorted(list(dag_runs.keys()))
+ # Only show the desired number of runs regardless of the trigger method
+ if len(dates) > num_runs:
+ dates = dates[-num_runs:]
+
max_date = max(dates) if dates else None
tis = dag.get_task_instances(
[49/50] incubator-airflow git commit: closes
apache/incubator-airflow#2478 *Closed for inactivity.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2478 *Closed for inactivity.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d1f7af39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d1f7af39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d1f7af39
Branch: refs/heads/v1-10-test
Commit: d1f7af393f2d0e27807d0fc31d8023544f5c2aac
Parents: 97ab9e7
Author: r39132 <si...@yahoo.com>
Authored: Thu May 3 23:02:59 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu May 3 23:02:59 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[41/50] incubator-airflow git commit: [AIRFLOW-2409] Supply password
as a parameter
Posted by fo...@apache.org.
[AIRFLOW-2409] Supply password as a parameter
Supply the password as a parameter on the cli
Closes #3304 from Fokko/supply-password
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2a079b95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2a079b95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2a079b95
Branch: refs/heads/v1-10-test
Commit: 2a079b953fe35049805079684fe43a1499694e4b
Parents: 71954a5
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Thu May 3 08:32:51 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Thu May 3 08:32:51 2018 +0200
----------------------------------------------------------------------
airflow/bin/cli.py | 8 +++++++-
tests/core.py | 15 +++++++++++----
2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2a079b95/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index f26cbe4..3097dce 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1221,6 +1221,8 @@ def create_user(args):
if args.use_random_password:
password = ''.join(random.choice(string.printable) for _ in range(16))
+ elif args.password:
+ password = args.password
else:
password = getpass.getpass('Password:')
password_confirmation = getpass.getpass('Repeat for confirmation:')
@@ -1622,6 +1624,10 @@ class CLIFactory(object):
('-u', '--username',),
help='Username of the user',
type=str),
+ 'password': Arg(
+ ('-p', '--password',),
+ help='Password of the user',
+ type=str),
'use_random_password': Arg(
('--use_random_password',),
help='Do not prompt for password. Use random string instead',
@@ -1768,7 +1774,7 @@ class CLIFactory(object):
'func': create_user,
'help': "Create an admin account",
'args': ('role', 'username', 'email', 'firstname', 'lastname',
- 'use_random_password'),
+ 'password', 'use_random_password'),
},
)
subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2a079b95/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 6d18ffe..4cece16 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -983,13 +983,20 @@ class CliTests(unittest.TestCase):
args = self.parser.parse_args(['list_dags', '--report'])
cli.list_dags(args)
- def test_cli_create_user(self):
+ def test_cli_create_user_random_password(self):
args = self.parser.parse_args([
- 'create_user', '-u', 'test', '-l', 'doe', '-f', 'jon',
+ 'create_user', '-u', 'test1', '-l', 'doe', '-f', 'jon',
'-e', 'jdoe@foo.com', '-r', 'Viewer', '--use_random_password'
])
cli.create_user(args)
+ def test_cli_create_user_supplied_password(self):
+ args = self.parser.parse_args([
+ 'create_user', '-u', 'test2', '-l', 'doe', '-f', 'jon',
+ '-e', 'jdoe@apache.org', '-r', 'Viewer', '-p', 'test'
+ ])
+ cli.create_user(args)
+
def test_cli_list_tasks(self):
for dag_id in self.dagbag.dags.keys():
args = self.parser.parse_args(['list_tasks', dag_id])
[50/50] incubator-airflow git commit: [AIRFLOW-1899] Fix Kubernetes
tests
Posted by fo...@apache.org.
[AIRFLOW-1899] Fix Kubernetes tests
[AIRFLOW-1899] Add full deployment
- Made home directory configurable
- Documentation fix
- Add licenses
[AIRFLOW-1899] Tests for the Kubernetes Executor
Add an integration test for the Kubernetes
executor. Done by
spinning up different versions of kubernetes and
run a DAG
by invoking the REST API
Closes #3301 from Fokko/fix-kubernetes-executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/16bae563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/16bae563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/16bae563
Branch: refs/heads/v1-10-test
Commit: 16bae5634df24132b37eb752fe816f51bf7e83ca
Parents: d1f7af3
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Fri May 4 08:58:12 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri May 4 08:58:12 2018 +0200
----------------------------------------------------------------------
.gitignore | 1 -
.travis.yml | 3 -
airflow/config_templates/default_airflow.cfg | 8 +
airflow/configuration.py | 9 +-
.../contrib/executors/kubernetes_executor.py | 80 ++---
airflow/contrib/kubernetes/kube_client.py | 4 +-
.../contrib/kubernetes/worker_configuration.py | 19 +-
airflow/jobs.py | 4 +-
airflow/www_rbac/api/experimental/endpoints.py | 27 +-
scripts/ci/kubernetes/docker/Dockerfile | 5 +
scripts/ci/kubernetes/docker/airflow-init.sh | 24 ++
scripts/ci/kubernetes/docker/build.sh | 11 +-
scripts/ci/kubernetes/kube/airflow.yaml | 102 +-----
scripts/ci/kubernetes/kube/configmaps.yaml | 359 +++++++++++++++++++
scripts/ci/kubernetes/kube/deploy.sh | 6 +
scripts/ci/kubernetes/kube/postgres.yaml | 1 +
scripts/ci/kubernetes/kube/secrets.yaml | 25 ++
scripts/ci/kubernetes/kube/volumes.yaml | 1 +
scripts/ci/travis_script.sh | 2 +-
setup.cfg | 13 +-
tests/cli/test_cli.py | 101 +++++-
tests/contrib/kubernetes/__init__.py | 14 -
tests/contrib/minikube/__init__.py | 18 +
.../minikube/test_kubernetes_executor.py | 97 +++++
.../minikube/test_kubernetes_pod_operator.py | 98 +++++
tests/contrib/minikube_tests/__init__.py | 18 -
.../minikube_tests/integration/__init__.py | 13 -
.../integration/airflow_controller.py | 166 ---------
.../test_kubernetes_executor_integration.py | 67 ----
.../test_kubernetes_pod_operator.py | 93 -----
.../www_rbac/api/experimental/test_endpoints.py | 21 +-
31 files changed, 866 insertions(+), 544 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 04c408f..a42962f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -137,4 +137,3 @@ rat-results.txt
*.generated
*.tar.gz
scripts/ci/kubernetes/kube/.generated/airflow.yaml
-
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6d29a7a..77033df 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -80,9 +80,6 @@ matrix:
env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
- python: "2.7"
env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
- allow_failures:
- - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
- - env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
cache:
directories:
- $HOME/.wheelhouse/
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index fa5eea0..6da4287 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -502,6 +504,7 @@ hide_sensitive_variable_fields = True
[elasticsearch]
elasticsearch_host =
+
[kubernetes]
# The repository and tag of the Kubernetes Image for the Worker to Run
worker_container_repository =
@@ -550,6 +553,11 @@ image_pull_secrets =
# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2
gcp_service_account_keys =
+# Use the service account kubernetes gives to pods to connect to kubernetes cluster.
+# It’s intended for clients that expect to be running inside a pod running on kubernetes.
+# It will raise an exception if called from a process not running in a kubernetes environment.
+in_cluster = True
+
[kubernetes_secrets]
# The scheduler mounts the following secrets into your workers as they are launched by the
# scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 130356c..20ef067 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -442,7 +442,10 @@ if not os.path.isfile(AIRFLOW_CONFIG):
)
with open(AIRFLOW_CONFIG, 'w') as f:
cfg = parameterized_config(DEFAULT_CONFIG)
- f.write(cfg.split(TEMPLATE_START)[-1].strip())
+ cfg = cfg.split(TEMPLATE_START)[-1].strip()
+ if six.PY2:
+ cfg = cfg.encode('utf8')
+ f.write(cfg)
log.info("Reading the config from %s", AIRFLOW_CONFIG)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index cdce95f..17b2908 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -23,6 +23,7 @@ from uuid import uuid4
import kubernetes
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
+from airflow.configuration import conf
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from airflow.contrib.kubernetes.kube_client import get_kube_client
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
@@ -87,20 +88,6 @@ class KubeConfig:
core_section = 'core'
kubernetes_section = 'kubernetes'
- @staticmethod
- def safe_get(section, option, default):
- try:
- return configuration.get(section, option)
- except AirflowConfigException:
- return default
-
- @staticmethod
- def safe_getboolean(section, option, default):
- try:
- return configuration.getboolean(section, option)
- except AirflowConfigException:
- return default
-
def __init__(self):
configuration_dict = configuration.as_dict(display_sensitive=True)
self.core_configuration = configuration_dict['core']
@@ -114,40 +101,37 @@ class KubeConfig:
self.kubernetes_section, 'worker_container_tag')
self.kube_image = '{}:{}'.format(
self.worker_container_repository, self.worker_container_tag)
- self.delete_worker_pods = self.safe_getboolean(
- self.kubernetes_section, 'delete_worker_pods', True)
+ self.delete_worker_pods = conf.getboolean(
+ self.kubernetes_section, 'delete_worker_pods')
- self.worker_service_account_name = self.safe_get(
- self.kubernetes_section, 'worker_service_account_name', 'default')
- self.image_pull_secrets = self.safe_get(
- self.kubernetes_section, 'image_pull_secrets', '')
+ self.worker_service_account_name = conf.get(
+ self.kubernetes_section, 'worker_service_account_name')
+ self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets')
# NOTE: `git_repo` and `git_branch` must be specified together as a pair
# The http URL of the git repository to clone from
- self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None)
+ self.git_repo = conf.get(self.kubernetes_section, 'git_repo')
# The branch of the repository to be checked out
- self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None)
+ self.git_branch = conf.get(self.kubernetes_section, 'git_branch')
# Optionally, the directory in the git repository containing the dags
- self.git_subpath = self.safe_get(self.kubernetes_section, 'git_subpath', '')
+ self.git_subpath = conf.get(self.kubernetes_section, 'git_subpath')
# Optionally a user may supply a `git_user` and `git_password` for private
# repositories
- self.git_user = self.safe_get(self.kubernetes_section, 'git_user', None)
- self.git_password = self.safe_get(self.kubernetes_section, 'git_password', None)
+ self.git_user = conf.get(self.kubernetes_section, 'git_user')
+ self.git_password = conf.get(self.kubernetes_section, 'git_password')
# NOTE: The user may optionally use a volume claim to mount a PV containing
# DAGs directly
- self.dags_volume_claim = self.safe_get(self.kubernetes_section,
- 'dags_volume_claim', None)
+ self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim')
# This prop may optionally be set for PV Claims and is used to write logs
- self.logs_volume_claim = self.safe_get(
- self.kubernetes_section, 'logs_volume_claim', None)
+ self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim')
# This prop may optionally be set for PV Claims and is used to locate DAGs
# on a SubPath
- self.dags_volume_subpath = self.safe_get(
- self.kubernetes_section, 'dags_volume_subpath', None)
+ self.dags_volume_subpath = conf.get(
+ self.kubernetes_section, 'dags_volume_subpath')
# This prop may optionally be set for PV Claims and is used to write logs
self.base_log_folder = configuration.get(self.core_section, 'base_log_folder')
@@ -156,36 +140,32 @@ class KubeConfig:
# that if your
# cluster has RBAC enabled, your scheduler may need service account permissions to
# create, watch, get, and delete pods in this namespace.
- self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace',
- 'default')
+ self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
# interact with cluster components.
- self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace',
- 'default')
+ self.executor_namespace = conf.get(self.kubernetes_section, 'namespace')
# Task secrets managed by KubernetesExecutor.
- self.gcp_service_account_keys = self.safe_get(
- self.kubernetes_section, 'gcp_service_account_keys', None)
+ self.gcp_service_account_keys = conf.get(self.kubernetes_section,
+ 'gcp_service_account_keys')
# If the user is using the git-sync container to clone their repository via git,
# allow them to specify repository, tag, and pod name for the init container.
- self.git_sync_container_repository = self.safe_get(
- self.kubernetes_section, 'git_sync_container_repository',
- 'gcr.io/google-containers/git-sync-amd64')
+ self.git_sync_container_repository = conf.get(
+ self.kubernetes_section, 'git_sync_container_repository')
- self.git_sync_container_tag = self.safe_get(
- self.kubernetes_section, 'git_sync_container_tag', 'v2.0.5')
+ self.git_sync_container_tag = conf.get(
+ self.kubernetes_section, 'git_sync_container_tag')
self.git_sync_container = '{}:{}'.format(
self.git_sync_container_repository, self.git_sync_container_tag)
- self.git_sync_init_container_name = self.safe_get(
- self.kubernetes_section, 'git_sync_init_container_name', 'git-sync-clone')
+ self.git_sync_init_container_name = conf.get(
+ self.kubernetes_section, 'git_sync_init_container_name')
# The worker pod may optionally have a valid Airflow config loaded via a
# configmap
- self.airflow_configmap = self.safe_get(self.kubernetes_section,
- 'airflow_configmap', None)
+ self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')
self._validate()
@@ -272,7 +252,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version))
elif status == 'Succeeded':
self.log.info('Event: %s Succeeded', pod_id)
- self.watcher_queue.put((pod_id, State.SUCCESS, labels, resource_version))
+ self.watcher_queue.put((pod_id, None, labels, resource_version))
elif status == 'Running':
self.log.info('Event: %s is Running', pod_id)
else:
@@ -552,7 +532,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
self.log.debug('Start with worker_uuid: %s', self.worker_uuid)
# always need to reset resource version since we don't know
# when we last started, note for behavior below
- # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod
+ # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs
+ # /CoreV1Api.md#list_namespaced_pod
KubeResourceVersion.reset_resource_version(self._session)
self.task_queue = Queue()
self.result_queue = Queue()
@@ -610,8 +591,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
task_id=task_id,
execution_date=ex_time
).one()
-
- if item.state == State.RUNNING or item.state == State.QUEUED:
+ if state:
item.state = state
self._session.add(item)
self._session.commit()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index d1a63a2..1d3cc9d 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+from airflow.configuration import conf
def _load_kube_config(in_cluster):
from kubernetes import config, client
@@ -26,6 +26,6 @@ def _load_kube_config(in_cluster):
return client.CoreV1Api()
-def get_kube_client(in_cluster=True):
+def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster')):
# TODO: This should also allow people to point to a cluster.
return _load_kube_config(in_cluster)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index cd2cb9f..ac4dacf 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -21,13 +21,18 @@ import six
from airflow.contrib.kubernetes.pod import Pod, Resources
from airflow.contrib.kubernetes.secret import Secret
+from airflow.utils.log.logging_mixin import LoggingMixin
-class WorkerConfiguration:
+class WorkerConfiguration(LoggingMixin):
"""Contains Kubernetes Airflow Worker configuration logic"""
def __init__(self, kube_config):
self.kube_config = kube_config
+ self.worker_airflow_home = self.kube_config.airflow_home
+ self.worker_airflow_dags = self.kube_config.dags_folder
+ self.worker_airflow_logs = self.kube_config.base_log_folder
+ super(WorkerConfiguration, self).__init__()
def _get_init_containers(self, volume_mounts):
"""When using git to retrieve the DAGs, use the GitSync Init Container"""
@@ -79,7 +84,7 @@ class WorkerConfiguration:
'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor'
}
if self.kube_config.airflow_configmap:
- env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
+ env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
return env
def _get_secrets(self):
@@ -129,19 +134,19 @@ class WorkerConfiguration:
volume_mounts = [{
'name': dags_volume_name,
'mountPath': os.path.join(
- self.kube_config.dags_folder,
+ self.worker_airflow_dags,
self.kube_config.git_subpath
),
'readOnly': True
}, {
'name': logs_volume_name,
- 'mountPath': self.kube_config.base_log_folder
+ 'mountPath': self.worker_airflow_logs
}]
# Mount the airflow.cfg file via a configmap the user has specified
if self.kube_config.airflow_configmap:
config_volume_name = 'airflow-config'
- config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
+ config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
volumes.append({
'name': config_volume_name,
'configMap': {
@@ -172,6 +177,10 @@ class WorkerConfiguration:
annotations = {
'iam.cloud.google.com/service-account': gcp_sa_key
} if gcp_sa_key else {}
+ airflow_command = airflow_command.replace("-sd", "-i -sd")
+ airflow_path = airflow_command.split('-sd')[-1]
+ airflow_path = self.worker_airflow_home + airflow_path.split('/')[-1]
+ airflow_command = airflow_command.split('-sd')[0] + '-sd ' + airflow_path
return Pod(
namespace=namespace,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ecbfef8..e5a07b7 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/www_rbac/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py
index 5bc0529..bddf0c1 100644
--- a/airflow/www_rbac/api/experimental/endpoints.py
+++ b/airflow/www_rbac/api/experimental/endpoints.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,8 @@ from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils import timezone
from airflow.www_rbac.app import csrf
+from airflow import models
+from airflow.utils.db import create_session
from flask import g, Blueprint, jsonify, request, url_for
@@ -112,6 +114,27 @@ def task_info(dag_id, task_id):
return jsonify(fields)
+@api_experimental.route('/dags/<string:dag_id>/paused/<string:paused>', methods=['GET'])
+@requires_authentication
+def dag_paused(dag_id, paused):
+ """(Un)pauses a dag"""
+
+ DagModel = models.DagModel
+ with create_session() as session:
+ orm_dag = (
+ session.query(DagModel)
+ .filter(DagModel.dag_id == dag_id).first()
+ )
+ if paused == 'true':
+ orm_dag.is_paused = True
+ else:
+ orm_dag.is_paused = False
+ session.merge(orm_dag)
+ session.commit()
+
+ return jsonify({'response': 'ok'})
+
+
@api_experimental.route(
'/dags/<string:dag_id>/dag_runs/<string:execution_date>/tasks/<string:task_id>',
methods=['GET'])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile
index 967b698..6d2c62d 100644
--- a/scripts/ci/kubernetes/docker/Dockerfile
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -33,6 +33,9 @@ RUN apt-get update -y && apt-get install -y \
unzip \
&& apt-get clean
+
+RUN pip install --upgrade pip
+
# Since we install vanilla Airflow, we also want to have support for Postgres and Kubernetes
RUN pip install -U setuptools && \
pip install kubernetes && \
@@ -43,6 +46,8 @@ RUN pip install -U setuptools && \
COPY airflow.tar.gz /tmp/airflow.tar.gz
RUN pip install /tmp/airflow.tar.gz
+COPY airflow-init.sh /tmp/airflow-init.sh
+
COPY bootstrap.sh /bootstrap.sh
RUN chmod +x /bootstrap.sh
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/airflow-init.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh
new file mode 100755
index 0000000..dc33625
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/airflow-init.sh
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License.
+
+cd /usr/local/lib/python2.7/dist-packages/airflow && \
+cp -R example_dags/* /root/airflow/dags/ && \
+airflow initdb && \
+alembic upgrade heads && \
+airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/build.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/build.sh b/scripts/ci/kubernetes/docker/build.sh
index 6f14c4d..b93c6b1 100755
--- a/scripts/ci/kubernetes/docker/build.sh
+++ b/scripts/ci/kubernetes/docker/build.sh
@@ -27,7 +27,12 @@ if [ $? -eq 0 ]; then
eval $ENVCONFIG
fi
-cd $AIRFLOW_ROOT && python setup.py sdist && cp $AIRFLOW_ROOT/dist/*.tar.gz $DIRNAME/airflow.tar.gz && \
-cd $DIRNAME && \
-docker build --pull $DIRNAME --tag=${IMAGE}:${TAG} && \
+echo "Airflow directory $AIRFLOW_ROOT"
+echo "Airflow Docker directory $DIRNAME"
+
+cd $AIRFLOW_ROOT
+python setup.py sdist -q
+echo "Copy distro $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz"
+cp $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz
+cd $DIRNAME && docker build --pull $DIRNAME --tag=${IMAGE}:${TAG}
rm $DIRNAME/airflow.tar.gz
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/airflow.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml b/scripts/ci/kubernetes/kube/airflow.yaml
index 77566ae..09bbcd8 100644
--- a/scripts/ci/kubernetes/kube/airflow.yaml
+++ b/scripts/ci/kubernetes/kube/airflow.yaml
@@ -61,7 +61,7 @@ spec:
- "bash"
args:
- "-cx"
- - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade heads"
+ - "./tmp/airflow-init.sh"
containers:
- name: webserver
image: airflow
@@ -88,20 +88,20 @@ spec:
mountPath: /root/airflow/dags
- name: airflow-logs
mountPath: /root/airflow/logs
- readinessProbe:
- initialDelaySeconds: 5
- timeoutSeconds: 5
- periodSeconds: 5
- httpGet:
- path: /admin
- port: 8080
- livenessProbe:
- initialDelaySeconds: 5
- timeoutSeconds: 5
- failureThreshold: 5
- httpGet:
- path: /admin
- port: 8080
+# readinessProbe:
+# initialDelaySeconds: 5
+# timeoutSeconds: 5
+# periodSeconds: 5
+# httpGet:
+# path: /login
+# port: 8080
+# livenessProbe:
+# initialDelaySeconds: 5
+# timeoutSeconds: 5
+# failureThreshold: 5
+# httpGet:
+# path: /login
+# port: 8080
- name: scheduler
image: airflow
imagePullPolicy: IfNotPresent
@@ -146,76 +146,4 @@ spec:
nodePort: 30809
selector:
name: airflow
----
-apiVersion: v1
-kind: Secret
-metadata:
- name: airflow-secrets
-type: Opaque
-data:
- # The sql_alchemy_conn value is a base64 encoded represenation of this connection string:
- # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
- sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
----
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: airflow-configmap
-data:
- airflow.cfg: |
- [core]
- airflow_home = /root/airflow
- dags_folder = /root/airflow/dags
- base_log_folder = /root/airflow/logs
- logging_level = INFO
- executor = KubernetesExecutor
- parallelism = 32
- plugins_folder = /root/airflow/plugins
- sql_alchemy_conn = $SQL_ALCHEMY_CONN
-
- [scheduler]
- dag_dir_list_interval = 300
- child_process_log_directory = /root/airflow/logs/scheduler
- # Task instances listen for external kill signal (when you clear tasks
- # from the CLI or the UI), this defines the frequency at which they should
- # listen (in seconds).
- job_heartbeat_sec = 5
- max_threads = 16
-
- # The scheduler constantly tries to trigger new tasks (look at the
- # scheduler section in the docs for more information). This defines
- # how often the scheduler should run (in seconds).
- scheduler_heartbeat_sec = 5
-
- # after how much time should the scheduler terminate in seconds
- # -1 indicates to run continuously (see also num_runs)
- run_duration = -1
-
- # after how much time a new DAGs should be picked up from the filesystem
- min_file_process_interval = 0
-
- statsd_on = False
- statsd_host = localhost
- statsd_port = 8125
- statsd_prefix = airflow
-
- # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
- min_file_parsing_loop_time = 1
-
- print_stats_interval = 30
- scheduler_zombie_task_threshold = 300
- max_tis_per_query = 0
- authenticate = False
-
- [kubernetes]
- airflow_configmap = airflow-configmap
- worker_container_repository = airflow
- worker_container_tag = latest
- delete_worker_pods = True
- git_repo = https://github.com/grantnicholas/testdags.git
- git_branch = master
- dags_volume_claim = airflow-dags
- logs_volume_claim = airflow-logs
- [kubernetes_secrets]
- SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/configmaps.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml
new file mode 100644
index 0000000..ddba098
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -0,0 +1,359 @@
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License. *
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: airflow-configmap
+data:
+ airflow.cfg: |
+ [core]
+ airflow_home = /root/airflow
+ dags_folder = /root/airflow/dags
+ base_log_folder = /root/airflow/logs
+ logging_level = INFO
+ executor = KubernetesExecutor
+ parallelism = 32
+ load_examples = True
+ plugins_folder = /root/airflow/plugins
+ sql_alchemy_conn = $SQL_ALCHEMY_CONN
+
+ [scheduler]
+ dag_dir_list_interval = 300
+ child_process_log_directory = /root/airflow/logs/scheduler
+ # Task instances listen for external kill signal (when you clear tasks
+ # from the CLI or the UI), this defines the frequency at which they should
+ # listen (in seconds).
+ job_heartbeat_sec = 5
+ max_threads = 2
+
+ # The scheduler constantly tries to trigger new tasks (look at the
+ # scheduler section in the docs for more information). This defines
+ # how often the scheduler should run (in seconds).
+ scheduler_heartbeat_sec = 5
+
+ # after how much time should the scheduler terminate in seconds
+ # -1 indicates to run continuously (see also num_runs)
+ run_duration = -1
+
+ # after how much time a new DAGs should be picked up from the filesystem
+ min_file_process_interval = 0
+
+ statsd_on = False
+ statsd_host = localhost
+ statsd_port = 8125
+ statsd_prefix = airflow
+
+ # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
+ min_file_parsing_loop_time = 1
+
+ print_stats_interval = 30
+ scheduler_zombie_task_threshold = 300
+ max_tis_per_query = 0
+ authenticate = False
+
+ # Turn off scheduler catchup by setting this to False.
+ # Default behavior is unchanged and
+ # Command Line Backfills still work, but the scheduler
+ # will not do scheduler catchup if this is False,
+ # however it can be set on a per DAG basis in the
+ # DAG definition (catchup)
+ catchup_by_default = True
+
+ [webserver]
+ # The base url of your website as airflow cannot guess what domain or
+ # cname you are using. This is used in automated emails that
+ # airflow sends to point links to the right web server
+ base_url = http://localhost:8080
+
+ # The ip specified when starting the web server
+ web_server_host = 0.0.0.0
+
+ # The port on which to run the web server
+ web_server_port = 8080
+
+ # Paths to the SSL certificate and key for the web server. When both are
+ # provided SSL will be enabled. This does not change the web server port.
+ web_server_ssl_cert =
+ web_server_ssl_key =
+
+ # Number of seconds the webserver waits before killing gunicorn master that doesn't respond
+ web_server_master_timeout = 120
+
+ # Number of seconds the gunicorn webserver waits before timing out on a worker
+ web_server_worker_timeout = 120
+
+ # Number of workers to refresh at a time. When set to 0, worker refresh is
+ # disabled. When nonzero, airflow periodically refreshes webserver workers by
+ # bringing up new ones and killing old ones.
+ worker_refresh_batch_size = 1
+
+ # Number of seconds to wait before refreshing a batch of workers.
+ worker_refresh_interval = 30
+
+ # Secret key used to run your flask app
+ secret_key = temporary_key
+
+ # Number of workers to run the Gunicorn web server
+ workers = 4
+
+ # The worker class gunicorn should use. Choices include
+ # sync (default), eventlet, gevent
+ worker_class = sync
+
+ # Log files for the gunicorn webserver. '-' means log to stderr.
+ access_logfile = -
+ error_logfile = -
+
+ # Expose the configuration file in the web server
+ expose_config = False
+
+ # Set to true to turn on authentication:
+ # https://airflow.incubator.apache.org/security.html#web-authentication
+ authenticate = False
+
+ # Filter the list of dags by owner name (requires authentication to be enabled)
+ filter_by_owner = False
+
+ # Filtering mode. Choices include user (default) and ldapgroup.
+ # Ldap group filtering requires using the ldap backend
+ #
+ # Note that the ldap server needs the "memberOf" overlay to be set up
+ # in order to user the ldapgroup mode.
+ owner_mode = user
+
+ # Default DAG view. Valid values are:
+ # tree, graph, duration, gantt, landing_times
+ dag_default_view = tree
+
+ # Default DAG orientation. Valid values are:
+ # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
+ dag_orientation = LR
+
+ # Puts the webserver in demonstration mode; blurs the names of Operators for
+ # privacy.
+ demo_mode = False
+
+ # The amount of time (in secs) webserver will wait for initial handshake
+ # while fetching logs from other worker machine
+ log_fetch_timeout_sec = 5
+
+ # By default, the webserver shows paused DAGs. Flip this to hide paused
+ # DAGs by default
+ hide_paused_dags_by_default = False
+
+ # Consistent page size across all listing views in the UI
+ page_size = 100
+
+ # Use FAB-based webserver with RBAC feature
+ rbac = True
+
+ [smtp]
+ # If you want airflow to send emails on retries, failure, and you want to use
+ # the airflow.utils.email.send_email_smtp function, you have to configure an
+ # smtp server here
+ smtp_host = localhost
+ smtp_starttls = True
+ smtp_ssl = False
+ # Uncomment and set the user/pass settings if you want to use SMTP AUTH
+ # smtp_user = airflow
+ # smtp_password = airflow
+ smtp_port = 25
+ smtp_mail_from = airflow@example.com
+
+ [kubernetes]
+ airflow_configmap = airflow-configmap
+ worker_container_repository = airflow
+ worker_container_tag = latest
+ delete_worker_pods = True
+ git_repo = https://github.com/apache/incubator-airflow.git
+ git_branch = master
+ git_subpath = airflow/example_dags/
+ git_user =
+ git_password =
+ dags_volume_claim = airflow-dags
+ logs_volume_claim = airflow-logs
+ in_cluster = True
+ namespace = default
+ gcp_service_account_keys =
+
+ # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
+ git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
+ git_sync_container_tag = v2.0.5
+ git_sync_init_container_name = git-sync-clone
+
+ [kubernetes_secrets]
+ SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
+
+ [hive]
+ # Default mapreduce queue for HiveOperator tasks
+ default_hive_mapred_queue =
+
+ [celery]
+ # This section only applies if you are using the CeleryExecutor in
+ # [core] section above
+
+ # The app name that will be used by celery
+ celery_app_name = airflow.executors.celery_executor
+
+ # The concurrency that will be used when starting workers with the
+ # "airflow worker" command. This defines the number of task instances that
+ # a worker will take, so size up your workers based on the resources on
+ # your worker box and the nature of your tasks
+ worker_concurrency = 16
+
+ # When you start an airflow worker, airflow starts a tiny web server
+ # subprocess to serve the workers local log files to the airflow main
+ # web server, who then builds pages and sends them to users. This defines
+ # the port on which the logs are served. It needs to be unused, and open
+ # visible from the main web server to connect into the workers.
+ worker_log_server_port = 8793
+
+ # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
+ # a sqlalchemy database. Refer to the Celery documentation for more
+ # information.
+ # http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
+ broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
+
+ # The Celery result_backend. When a job finishes, it needs to update the
+ # metadata of the job. Therefore it will post a message on a message bus,
+ # or insert it into a database (depending of the backend)
+ # This status is used by the scheduler to update the state of the task
+ # The use of a database is highly recommended
+ # http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
+ result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
+
+ # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
+ # it `airflow flower`. This defines the IP that Celery Flower runs on
+ flower_host = 0.0.0.0
+
+ # The root URL for Flower
+ # Ex: flower_url_prefix = /flower
+ flower_url_prefix =
+
+ # This defines the port that Celery Flower runs on
+ flower_port = 5555
+
+ # Default queue that tasks get assigned to and that worker listen on.
+ default_queue = default
+
+ # Import path for celery configuration options
+ celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
+
+ [celery_broker_transport_options]
+ # The visibility timeout defines the number of seconds to wait for the worker
+ # to acknowledge the task before the message is redelivered to another worker.
+ # Make sure to increase the visibility timeout to match the time of the longest
+ # ETA you're planning to use. Especially important in case of using Redis or SQS
+ visibility_timeout = 21600
+
+ # In case of using SSL
+ ssl_active = False
+ ssl_key =
+ ssl_cert =
+ ssl_cacert =
+
+ [dask]
+ # This section only applies if you are using the DaskExecutor in
+ # [core] section above
+
+ # The IP address and port of the Dask cluster's scheduler.
+ cluster_address = 127.0.0.1:8786
+ # TLS/ SSL settings to access a secured Dask scheduler.
+ tls_ca =
+ tls_cert =
+ tls_key =
+
+ [ldap]
+ # set this to ldaps://<your.ldap.server>:<port>
+ uri =
+ user_filter = objectClass=*
+ user_name_attr = uid
+ group_member_attr = memberOf
+ superuser_filter =
+ data_profiler_filter =
+ bind_user = cn=Manager,dc=example,dc=com
+ bind_password = insecure
+ basedn = dc=example,dc=com
+ cacert = /etc/ca/ldap_ca.crt
+ search_scope = LEVEL
+
+ [mesos]
+ # Mesos master address which MesosExecutor will connect to.
+ master = localhost:5050
+
+ # The framework name which Airflow scheduler will register itself as on mesos
+ framework_name = Airflow
+
+ # Number of cpu cores required for running one task instance using
+ # 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+ # command on a mesos slave
+ task_cpu = 1
+
+ # Memory in MB required for running one task instance using
+ # 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+ # command on a mesos slave
+ task_memory = 256
+
+ # Enable framework checkpointing for mesos
+ # See http://mesos.apache.org/documentation/latest/slave-recovery/
+ checkpoint = False
+
+ # Failover timeout in milliseconds.
+ # When checkpointing is enabled and this option is set, Mesos waits
+ # until the configured timeout for
+ # the MesosExecutor framework to re-register after a failover. Mesos
+ # shuts down running tasks if the
+ # MesosExecutor framework fails to re-register within this timeframe.
+ # failover_timeout = 604800
+
+ # Enable framework authentication for mesos
+ # See http://mesos.apache.org/documentation/latest/configuration/
+ authenticate = False
+
+ # Mesos credentials, if authentication is enabled
+ # default_principal = admin
+ # default_secret = admin
+
+ # Optional Docker Image to run on slave before running the command
+ # This image should be accessible from mesos slave i.e mesos slave
+ # should be able to pull this docker image before executing the command.
+ # docker_image_slave = puckel/docker-airflow
+
+ [kerberos]
+ ccache = /tmp/airflow_krb5_ccache
+ # gets augmented with fqdn
+ principal = airflow
+ reinit_frequency = 3600
+ kinit_path = kinit
+ keytab = airflow.keytab
+
+ [cli]
+ api_client = airflow.api.client.json_client
+ endpoint_url = http://localhost:8080
+
+ [api]
+ auth_backend = airflow.api.auth.backend.default
+
+ [github_enterprise]
+ api_rev = v3
+
+ [admin]
+ # UI to hide sensitive variable fields when set to True
+ hide_sensitive_variable_fields = True
+
+ [elasticsearch]
+ elasticsearch_host =
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh
index a5adcf8..e585d87 100755
--- a/scripts/ci/kubernetes/kube/deploy.sh
+++ b/scripts/ci/kubernetes/kube/deploy.sh
@@ -21,8 +21,14 @@ IMAGE=${1:-airflow/ci}
TAG=${2:-latest}
DIRNAME=$(cd "$(dirname "$0")"; pwd)
+kubectl delete -f $DIRNAME/postgres.yaml
+kubectl delete -f $DIRNAME/airflow.yaml
+kubectl delete -f $DIRNAME/secrets.yaml
+
kubectl apply -f $DIRNAME/postgres.yaml
kubectl apply -f $DIRNAME/volumes.yaml
+kubectl apply -f $DIRNAME/secrets.yaml
+kubectl apply -f $DIRNAME/configmaps.yaml
kubectl apply -f $DIRNAME/airflow.yaml
# wait for up to 10 minutes for everything to be deployed
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/postgres.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/postgres.yaml b/scripts/ci/kubernetes/kube/postgres.yaml
index 67a0635..1130921 100644
--- a/scripts/ci/kubernetes/kube/postgres.yaml
+++ b/scripts/ci/kubernetes/kube/postgres.yaml
@@ -30,6 +30,7 @@ spec:
containers:
- name: postgres
image: postgres
+ imagePullPolicy: IfNotPresent
ports:
- containerPort: 5432
protocol: TCP
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/secrets.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/secrets.yaml b/scripts/ci/kubernetes/kube/secrets.yaml
new file mode 100644
index 0000000..4d533b3
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/secrets.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License. *
+apiVersion: v1
+kind: Secret
+metadata:
+ name: airflow-secrets
+type: Opaque
+data:
+ # The sql_alchemy_conn value is a base64 encoded represenation of this connection string:
+ # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+ sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/volumes.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/volumes.yaml b/scripts/ci/kubernetes/kube/volumes.yaml
index 073e98c..58ad368 100644
--- a/scripts/ci/kubernetes/kube/volumes.yaml
+++ b/scripts/ci/kubernetes/kube/volumes.yaml
@@ -62,3 +62,4 @@ spec:
resources:
requests:
storage: 2Gi
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index 8766e94..52571cc 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -26,7 +26,7 @@ then
tox -e $TOX_ENV
else
KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
- tox -e $TOX_ENV -- tests.contrib.minikube_tests \
+ tox -e $TOX_ENV -- tests.contrib.minikube \
--with-coverage \
--cover-erase \
--cover-html \
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/setup.cfg
----------------------------------------------------------------------
diff --git a/setup.cfg b/setup.cfg
index 7b3479c..622cc13 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -6,16 +5,15 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
[metadata]
name = Airflow
summary = Airflow is a system to programmatically author, schedule and monitor data pipelines.
@@ -29,8 +27,11 @@ packages = airflow
[build_sphinx]
source-dir = docs/
-build-dir = docs/_build
-all_files = 1
+build-dir = docs/_build
+all_files = 1
[upload_sphinx]
upload-dir = docs/_build/html
+
+[easy_install]
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/cli/test_cli.py
----------------------------------------------------------------------
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 9c79567..34c82bc 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,15 +22,84 @@ import unittest
from mock import patch, Mock, MagicMock
from time import sleep
-
import psutil
-
+from argparse import Namespace
from airflow import settings
-from airflow.bin.cli import get_num_ready_workers_running
+from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
+from airflow.models import TaskInstance
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.settings import Session
+from airflow import models
+
+import os
+
+dag_folder_path = '/'.join(os.path.realpath(__file__).split('/')[:-1])
+
+TEST_DAG_FOLDER = os.path.join(
+ os.path.dirname(dag_folder_path), 'dags')
+TEST_DAG_ID = 'unit_tests'
+
+
+def reset(dag_id):
+ session = Session()
+ tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+ tis.delete()
+ session.commit()
+ session.close()
+
+
+def create_mock_args(
+ task_id,
+ dag_id,
+ subdir,
+ execution_date,
+ task_params=None,
+ dry_run=False,
+ queue=None,
+ pool=None,
+ priority_weight_total=None,
+ retries=0,
+ local=True,
+ mark_success=False,
+ ignore_all_dependencies=False,
+ ignore_depends_on_past=False,
+ ignore_dependencies=False,
+ force=False,
+ run_as_user=None,
+ executor_config={},
+ cfg_path=None,
+ pickle=None,
+ raw=None,
+ interactive=None,
+):
+ args = MagicMock(spec=Namespace)
+ args.task_id = task_id
+ args.dag_id = dag_id
+ args.subdir = subdir
+ args.task_params = task_params
+ args.execution_date = execution_date
+ args.dry_run = dry_run
+ args.queue = queue
+ args.pool = pool
+ args.priority_weight_total = priority_weight_total
+ args.retries = retries
+ args.local = local
+ args.run_as_user = run_as_user
+ args.executor_config = executor_config
+ args.cfg_path = cfg_path
+ args.pickle = pickle
+ args.raw = raw
+ args.mark_success = mark_success
+ args.ignore_all_dependencies = ignore_all_dependencies
+ args.ignore_depends_on_past = ignore_depends_on_past
+ args.ignore_dependencies = ignore_dependencies
+ args.force = force
+ args.interactive = interactive
+ return args
class TestCLI(unittest.TestCase):
-
def setUp(self):
self.gunicorn_master_proc = Mock(pid=None)
self.children = MagicMock()
@@ -74,3 +143,23 @@ class TestCLI(unittest.TestCase):
"webserver terminated with return code {} in debug mode".format(return_code))
p.terminate()
p.wait()
+
+ def test_local_run(self):
+ args = create_mock_args(
+ task_id='print_the_context',
+ dag_id='example_python_operator',
+ subdir='/root/dags/example_python_operator.py',
+ interactive=True,
+ execution_date=timezone.parse('2018-04-27T08:39:51.298439+00:00')
+ )
+
+ reset(args.dag_id)
+
+ with patch('argparse.Namespace', args) as mock_args:
+ run(mock_args)
+ dag = get_dag(mock_args)
+ task = dag.get_task(task_id=args.task_id)
+ ti = TaskInstance(task, args.execution_date)
+ ti.refresh_from_db()
+ state = ti.current_state()
+ self.assertEqual(state, State.SUCCESS)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/kubernetes/__init__.py b/tests/contrib/kubernetes/__init__.py
deleted file mode 100644
index 759b563..0000000
--- a/tests/contrib/kubernetes/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/__init__.py b/tests/contrib/minikube/__init__.py
new file mode 100644
index 0000000..114d189
--- /dev/null
+++ b/tests/contrib/minikube/__init__.py
@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_executor.py b/tests/contrib/minikube/test_kubernetes_executor.py
new file mode 100644
index 0000000..9827bc8
--- /dev/null
+++ b/tests/contrib/minikube/test_kubernetes_executor.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import unittest
+from subprocess import check_call, check_output
+
+import requests
+import time
+import six
+
+try:
+ check_call(["kubectl", "get", "pods"])
+except Exception as e:
+ raise unittest.SkipTest(
+ "Kubernetes integration tests require a minikube cluster;"
+ "Skipping tests {}".format(e)
+ )
+
+
+class KubernetesExecutorTest(unittest.TestCase):
+
+ def test_integration_run_dag(self):
+ host_ip = check_output(['minikube', 'ip'])
+ if six.PY3:
+ host_ip = host_ip.decode('UTF-8')
+ host = '{}:30809'.format(host_ip.strip())
+
+ # Enable the dag
+ result = requests.get(
+ 'http://{}/api/experimental/'
+ 'dags/example_python_operator/paused/false'.format(host)
+ )
+ self.assertEqual(result.status_code, 200, "Could not enable DAG")
+
+ # Trigger a new dagrun
+ result = requests.post(
+ 'http://{}/api/experimental/'
+ 'dags/example_python_operator/dag_runs'.format(host),
+ json={}
+ )
+ self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run")
+
+ time.sleep(1)
+
+ result = requests.get(
+ 'http://{}/api/experimental/latest_runs'.format(host)
+ )
+ self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run")
+ result_json = result.json()
+
+ self.assertGreater(len(result_json['items']), 0)
+
+ execution_date = result_json['items'][0]['execution_date']
+ print("Found the job with execution date {}".format(execution_date))
+
+ tries = 0
+ state = ''
+ # Wait 100 seconds for the operator to complete
+ while tries < 20:
+ time.sleep(5)
+
+ # Trigger a new dagrun
+ result = requests.get(
+ 'http://{}/api/experimental/dags/example_python_operator/'
+ 'dag_runs/{}/tasks/print_the_context'.format(host, execution_date)
+ )
+ self.assertEqual(result.status_code, 200, "Could not get the status")
+ result_json = result.json()
+ state = result_json['state']
+ print("Attempt {}: Current state of operator is {}".format(tries, state))
+
+ if state == 'success':
+ break
+ tries += 1
+
+ self.assertEqual(state, 'success')
+
+ # Maybe check if we can retrieve the logs, but then we need to extend the API
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..081fc04
--- /dev/null
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+from subprocess import check_call
+import mock
+from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+
+try:
+ check_call(["kubectl", "get", "pods"])
+except Exception as e:
+ raise unittest.SkipTest(
+ "Kubernetes integration tests require a minikube cluster;"
+ "Skipping tests {}".format(e)
+ )
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+ def test_working_pod(self):
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+ k.execute(None)
+
+ def test_logging(self):
+ with mock.patch.object(PodLauncher, 'log') as mock_logger:
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ get_logs=True
+ )
+ k.execute(None)
+ mock_logger.info.assert_any_call(b"+ echo\n")
+
+ def test_faulty_image(self):
+ bad_image_name = "foobar"
+ k = KubernetesPodOperator(
+ namespace='default',
+ image=bad_image_name,
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ startup_timeout_seconds=5
+ )
+ with self.assertRaises(AirflowException) as cm:
+ k.execute(None),
+
+ print("exception: {}".format(cm))
+
+ def test_pod_failure(self):
+ """
+ Tests that the task fails when a pod reports a failure
+ """
+ bad_internal_command = "foobar"
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=[bad_internal_command, "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+ with self.assertRaises(AirflowException):
+ k.execute(None)
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
deleted file mode 100644
index 4067cc7..0000000
--- a/tests/contrib/minikube_tests/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/__init__.py b/tests/contrib/minikube_tests/integration/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/tests/contrib/minikube_tests/integration/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/airflow_controller.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/airflow_controller.py b/tests/contrib/minikube_tests/integration/airflow_controller.py
deleted file mode 100644
index 5604652..0000000
--- a/tests/contrib/minikube_tests/integration/airflow_controller.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import subprocess
-import time
-
-
-class RunCommandError(Exception):
- pass
-
-
-class TimeoutError(Exception):
- pass
-
-
-class DagRunState:
- SUCCESS = "success"
- FAILED = "failed"
- RUNNING = "running"
-
-
-def run_command(command):
- process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = process.communicate()
- if process.returncode != 0:
- raise RunCommandError(
- "Error while running command: {}; Stdout: {}; Stderr: {}".format(
- command, stdout, stderr
- ))
- return stdout, stderr
-
-
-def run_command_in_pod(pod_name, container_name, command):
- return run_command("kubectl exec {pod_name} -c {container_name} -- {command}".format(
- pod_name=pod_name, container_name=container_name, command=command
- ))
-
-
-def get_scheduler_logs(airflow_pod=None):
- airflow_pod = airflow_pod or _get_airflow_pod()
-
- return run_command("kubectl logs {pod_name} scheduler"
- .format(pod_name=airflow_pod))
-
-
-def _unpause_dag(dag_id, airflow_pod=None):
- airflow_pod = airflow_pod or _get_airflow_pod()
- return run_command_in_pod(airflow_pod, "scheduler",
- "airflow unpause {dag_id}".format(dag_id=dag_id))
-
-
-def run_dag(dag_id, run_id, airflow_pod=None):
- airflow_pod = airflow_pod or _get_airflow_pod()
- _unpause_dag(dag_id, airflow_pod)
- return run_command_in_pod(airflow_pod, "scheduler",
- "airflow trigger_dag {dag_id} -r {run_id}".format(
- dag_id=dag_id, run_id=run_id
- ))
-
-
-def _get_pod_by_grep(grep_phrase):
- stdout, stderr = run_command(
- "kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format(
- grep_phrase=grep_phrase
- ))
- pod_name = stdout.strip()
- return pod_name
-
-
-def _get_airflow_pod():
- return _get_pod_by_grep("^airflow")
-
-
-def _get_postgres_pod():
- return _get_pod_by_grep("^postgres")
-
-
-def _parse_state(stdout):
- end_line = "(1 row)"
- prev_line = None
- for line in stdout.split("\n"):
- if end_line in line:
- return prev_line.strip()
- prev_line = line
-
- raise Exception("Unknown psql output: {}".format(stdout))
-
-
-def get_dag_run_table(postgres_pod=None):
- postgres_pod = postgres_pod or _get_postgres_pod()
- stdout, stderr = run_command_in_pod(
- postgres_pod, "postgres",
- """psql airflow -c "select * from dag_run" """
- )
- return stdout
-
-
-def get_task_instance_table(postgres_pod=None):
- postgres_pod = postgres_pod or _get_postgres_pod()
- stdout, stderr = run_command_in_pod(
- postgres_pod, "postgres",
- """psql airflow -c "select * from task_instance" """
- )
- return stdout
-
-
-def get_dag_run_state(dag_id, run_id, postgres_pod=None):
- postgres_pod = postgres_pod or _get_postgres_pod()
- stdout, stderr = run_command_in_pod(
- postgres_pod, "postgres",
- """psql airflow -c "select state from dag_run where dag_id='{dag_id}' and
- run_id='{run_id}'" """.format(
- dag_id=dag_id, run_id=run_id
- )
- )
- return _parse_state(stdout)
-
-
-def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, timeout=120):
- postgres_pod = postgres_pod or _get_postgres_pod()
- for _ in range(0, timeout / poll_interval):
- dag_state = get_dag_run_state(dag_id, run_id, postgres_pod)
- if dag_state != DagRunState.RUNNING:
- capture_logs_for_failure(dag_state)
- return dag_state
- time.sleep(poll_interval)
-
- raise TimeoutError(
- "Timed out while waiting for DagRun with dag_id: {} run_id: {}".format(dag_id,
- run_id))
-
-
-def _kill_pod(pod_name):
- return run_command("kubectl delete pod {pod_name}".format(pod_name=pod_name))
-
-
-def kill_scheduler():
- airflow_pod = _get_pod_by_grep("^airflow")
- return _kill_pod(airflow_pod)
-
-
-def capture_logs_for_failure(state):
- if state != DagRunState.SUCCESS:
- stdout, stderr = get_scheduler_logs()
- print("stdout:")
- for line in stdout.split('\n'):
- print(line)
- print("stderr:")
- for line in stderr.split('\n'):
- print(line)
- print("dag_run:")
- print(get_dag_run_table())
- print("task_instance")
- print(get_task_instance_table())
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py b/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
deleted file mode 100644
index 602a717..0000000
--- a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-import unittest
-from uuid import uuid4
-
-from tests.contrib.minikube_tests.integration.airflow_controller\
- import DagRunState, RunCommandError, \
- dag_final_state, get_dag_run_state, kill_scheduler, run_command, run_dag
-
-try:
- run_command("kubectl get pods")
-except RunCommandError:
- SKIP_KUBE = True
-else:
- SKIP_KUBE = False
-
-
-class KubernetesExecutorTest(unittest.TestCase):
- @unittest.skipIf(SKIP_KUBE,
- 'Kubernetes integration tests are unsupported by this configuration')
- def test_kubernetes_executor_dag_runs_successfully(self):
- dag_id, run_id = "example_python_operator", uuid4().hex
- run_dag(dag_id, run_id)
- state = dag_final_state(dag_id, run_id, timeout=120)
- self.assertEquals(state, DagRunState.SUCCESS)
-
- @unittest.skipIf(SKIP_KUBE,
- 'Kubernetes integration tests are unsupported by this configuration')
- def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self):
- dag_id, run_id = "example_python_operator", uuid4().hex
- run_dag(dag_id, run_id)
-
- self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
-
- time.sleep(10)
-
- kill_scheduler()
-
- self.assertEquals(dag_final_state(dag_id, run_id, timeout=180),
- DagRunState.SUCCESS)
-
- @unittest.skipIf(SKIP_KUBE,
- 'Kubernetes integration tests are unsupported by this configuration')
- def test_kubernetes_executor_config_works(self):
- dag_id, run_id = "example_kubernetes_executor", uuid4().hex
- run_dag(dag_id, run_id)
-
- self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
- self.assertEquals(dag_final_state(dag_id, run_id, timeout=300),
- DagRunState.SUCCESS)
-
-
-if __name__ == "__main__":
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
deleted file mode 100644
index 321f01f..0000000
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-from subprocess import check_call
-import mock
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
-
-try:
- check_call(["kubectl", "get", "pods"])
-except Exception as e:
- raise unittest.SkipTest(
- "Kubernetes integration tests require a minikube cluster;"
- "Skipping tests {}".format(e)
- )
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
- def test_working_pod(self):
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- k.execute(None)
-
- def test_logging(self):
- with mock.patch.object(PodLauncher, 'log') as mock_logger:
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- get_logs=True
- )
- k.execute(None)
- mock_logger.info.assert_any_call("+ echo\n")
-
- def test_faulty_image(self):
- bad_image_name = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image=bad_image_name,
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- startup_timeout_seconds=5
- )
- with self.assertRaises(AirflowException) as cm:
- k.execute(None),
-
- print("exception: {}".format(cm))
-
- def test_pod_failure(self):
- """
- Tests that the task fails when a pod reports a failure
- """
-
- bad_internal_command = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=[bad_internal_command, "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- with self.assertRaises(AirflowException):
- k.execute(None)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/www_rbac/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/api/experimental/test_endpoints.py b/tests/www_rbac/api/experimental/test_endpoints.py
index 7bcbb8e..a19492e 100644
--- a/tests/www_rbac/api/experimental/test_endpoints.py
+++ b/tests/www_rbac/api/experimental/test_endpoints.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -77,6 +77,23 @@ class TestApiExperimental(unittest.TestCase):
self.assertIn('error', response.data.decode('utf-8'))
self.assertEqual(404, response.status_code)
+ def test_task_paused(self):
+ url_template = '/api/experimental/dags/{}/paused/{}'
+
+ response = self.app.get(
+ url_template.format('example_bash_operator', 'true')
+ )
+ self.assertIn('ok', response.data.decode('utf-8'))
+ self.assertEqual(200, response.status_code)
+
+ url_template = '/api/experimental/dags/{}/paused/{}'
+
+ response = self.app.get(
+ url_template.format('example_bash_operator', 'false')
+ )
+ self.assertIn('ok', response.data.decode('utf-8'))
+ self.assertEqual(200, response.status_code)
+
def test_trigger_dag(self):
url_template = '/api/experimental/dags/{}/dag_runs'
response = self.app.post(
[21/50] incubator-airflow git commit: closes
apache/incubator-airflow#2539 *Closed for inactivity*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2539 *Closed for inactivity*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8c8d1407
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8c8d1407
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8c8d1407
Branch: refs/heads/v1-10-test
Commit: 8c8d1407f8fb374f1f6d8f7788f880461519a430
Parents: ad28dec
Author: r39132 <si...@yahoo.com>
Authored: Sun Apr 29 20:16:51 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 20:16:51 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[31/50] incubator-airflow git commit: closes
apache/incubator-airflow#2814 *Messed up PR - hundreds of old commits.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2814 *Messed up PR - hundreds of old commits.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8921e7d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8921e7d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8921e7d7
Branch: refs/heads/v1-10-test
Commit: 8921e7d70c46027c5a3f4b686f70d893ec91eb89
Parents: f63a2b1
Author: r39132 <si...@yahoo.com>
Authored: Mon Apr 30 13:58:40 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 13:58:40 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[42/50] incubator-airflow git commit: [AIRFLOW-XXX] Fix wrong table
header in scheduler.rst
Posted by fo...@apache.org.
[AIRFLOW-XXX] Fix wrong table header in scheduler.rst
Closes #3306 from sekikn/table_header
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6f688464
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6f688464
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6f688464
Branch: refs/heads/v1-10-test
Commit: 6f6884641f6eca799f35190ef133329a00a1daaf
Parents: 2a079b9
Author: Kengo Seki <se...@apache.org>
Authored: Wed May 2 23:47:33 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Wed May 2 23:47:33 2018 -0700
----------------------------------------------------------------------
docs/scheduler.rst | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6f688464/docs/scheduler.rst
----------------------------------------------------------------------
diff --git a/docs/scheduler.rst b/docs/scheduler.rst
index 7e4e544..dfa0a42 100644
--- a/docs/scheduler.rst
+++ b/docs/scheduler.rst
@@ -45,7 +45,7 @@ a ``str``, or a ``datetime.timedelta`` object. Alternatively, you can also
use one of these cron "preset":
+--------------+----------------------------------------------------------------+---------------+
-| preset | Run once a year at midnight of January 1 | cron |
+| preset | meaning | cron |
+==============+================================================================+===============+
| ``None`` | Don't schedule, use for exclusively "externally triggered" | |
| | DAGs | |
[05/50] incubator-airflow git commit: closes
apache/incubator-airflow#2586 *Closed for inactivity*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2586 *Closed for inactivity*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/172b73c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/172b73c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/172b73c1
Branch: refs/heads/v1-10-test
Commit: 172b73c155f555c49c340275793c0c0a0b09bc24
Parents: b922521
Author: r39132 <si...@yahoo.com>
Authored: Fri Apr 27 09:42:19 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Fri Apr 27 09:42:19 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[43/50] incubator-airflow git commit: [AIRFLOW-XXX] Add Reddit to
Airflow users
Posted by fo...@apache.org.
[AIRFLOW-XXX] Add Reddit to Airflow users
Closes #3309 from seato/reddit_airflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6b3f6cee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6b3f6cee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6b3f6cee
Branch: refs/heads/v1-10-test
Commit: 6b3f6cee118affee60b4b9319d6cf0d91c732bc4
Parents: 6f68846
Author: Richard Thai <ri...@gmail.com>
Authored: Thu May 3 21:23:09 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Thu May 3 21:23:09 2018 +0200
----------------------------------------------------------------------
README.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b3f6cee/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index bdbfbf2..07371ab 100644
--- a/README.md
+++ b/README.md
@@ -203,6 +203,7 @@ Currently **officially** using Airflow:
1. [Qubole](https://qubole.com) [[@msumit](https://github.com/msumit)]
1. [Quizlet](https://quizlet.com) [[@quizlet](https://github.com/quizlet)]
1. [Quora](https://www.quora.com/)
+1. [Reddit](https://www.reddit.com/) [[@reddit](https://github.com/reddit/)]
1. [Robinhood](https://robinhood.com) [[@vineet-rh](https://github.com/vineet-rh)]
1. [Scaleway](https://scaleway.com) [[@kdeldycke](https://github.com/kdeldycke)]
1. [Sense360](https://github.com/Sense360) [[@kamilmroczek](https://github.com/KamilMroczek)]
[10/50] incubator-airflow git commit: [AIRFLOW-2266][AIRFLOW-2343]
Remove google-cloud-dataflow dependency
Posted by fo...@apache.org.
[AIRFLOW-2266][AIRFLOW-2343] Remove google-cloud-dataflow dependency
This is caused due to the fact that the latest
release (2.4) for apache-beam[gcp] is not
available for Python 3.x. Also as we are using
Google's discovery based API for all google cloud
related commands we don't require to import
google-cloud-dataflow package
Closes #3273 from kaxil/patch-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e9b74b68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e9b74b68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e9b74b68
Branch: refs/heads/v1-10-test
Commit: e9b74b68aa848b6ac5602f3e435cba38efde32d5
Parents: 5aa1586
Author: Kaxil Naik <ka...@gmail.com>
Authored: Sat Apr 28 21:24:15 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sat Apr 28 21:24:15 2018 +0200
----------------------------------------------------------------------
setup.py | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9b74b68/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index b5461eb..fef8d8a 100644
--- a/setup.py
+++ b/setup.py
@@ -133,7 +133,6 @@ gcp_api = [
'google-api-python-client>=1.5.0, <1.6.0',
'oauth2client>=2.0.2, <2.1.0',
'PyOpenSSL',
- 'google-cloud-dataflow>=2.2.0',
'pandas-gbq'
]
hdfs = ['snakebite>=2.7.8']
@@ -206,8 +205,7 @@ devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + or
# Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
if PY3:
devel_ci = [package for package in devel_all if package not in
- ['snakebite>=2.7.8', 'snakebite[kerberos]>=2.7.8',
- 'google-cloud-dataflow>=2.2.0']]
+ ['snakebite>=2.7.8', 'snakebite[kerberos]>=2.7.8']]
else:
devel_ci = devel_all
[28/50] incubator-airflow git commit: [AIRFLOW-2398] Add BounceX to
list of current airflow users
Posted by fo...@apache.org.
[AIRFLOW-2398] Add BounceX to list of current airflow users
Closes #3282 from JoshFerge/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9a8e4f77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9a8e4f77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9a8e4f77
Branch: refs/heads/v1-10-test
Commit: 9a8e4f77568783b95efb35b483e1eac10f021ea9
Parents: 19b3901
Author: Josh Ferge <jo...@bounceexchange.com>
Authored: Mon Apr 30 07:03:33 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Mon Apr 30 07:03:37 2018 -0700
----------------------------------------------------------------------
README.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9a8e4f77/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index b504cbd..bd09e9c 100644
--- a/README.md
+++ b/README.md
@@ -105,6 +105,7 @@ Currently **officially** using Airflow:
1. [Boda Telecom Suite - CE](https://github.com/bodastage/bts-ce) [[@erssebaggala](https://github.com/erssebaggala), [@bodastage](https://github.com/bodastage)]
1. [Bodastage Solutions](http://bodastage.com) [[@erssebaggala](https://github.com/erssebaggala), [@bodastage](https://github.com/bodastage)]
1. [Bonnier Broadcasting](http://www.bonnierbroadcasting.com) [[@wileeam](https://github.com/wileeam)]
+1. [BounceX](http://www.bouncex.com) [[@JoshFerge](https://github.com/JoshFerge), [@hudsonrio](https://github.com/hudsonrio), [@ronniekritou](https://github.com/ronniekritou)]
1. [California Data Collaborative](https://github.com/California-Data-Collaborative) powered by [ARGO Labs](http://www.argolabs.org)
1. [Carbonite](https://www.carbonite.com) [[@ajbosco](https://github.com/ajbosco)]
1. [Celect](http://www.celect.com) [[@superdosh](https://github.com/superdosh) & [@chadcelect](https://github.com/chadcelect)]
[19/50] incubator-airflow git commit: closes
apache/incubator-airflow#2930 *Fix belongs in SQLAlchemy.*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2930 *Fix belongs in SQLAlchemy.*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3952e050
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3952e050
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3952e050
Branch: refs/heads/v1-10-test
Commit: 3952e050a1fe6191f031c2af92785b1823e8f3e3
Parents: 72f15a1
Author: r39132 <si...@yahoo.com>
Authored: Sun Apr 29 10:16:30 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 10:16:30 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[20/50] incubator-airflow git commit: [AIRFLOW-1933] Fix some typos
Posted by fo...@apache.org.
[AIRFLOW-1933] Fix some typos
Closes #2474 from Philippus/patch-1
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ad28decc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ad28decc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ad28decc
Branch: refs/heads/v1-10-test
Commit: ad28decc74a87f6f9fbff57f200199860a2bfa81
Parents: 3952e05
Author: Philippus Baalman <ph...@gmail.com>
Authored: Sun Apr 29 20:08:48 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 20:08:48 2018 -0700
----------------------------------------------------------------------
UPDATING.md | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad28decc/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 609c8db..8006876 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -74,17 +74,17 @@ With Airflow 1.9 or lower, there were two connection strings for the Google Clou
### SSH Hook updates, along with new SSH Operator & SFTP Operator
-SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
+SSH Hook now uses the Paramiko library to create an ssh client connection, instead of the sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
- update SSHHook constructor
- - use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer test_ssh_operator.py for usage info.
- - SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info.
+ - use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer to test_ssh_operator.py for usage info.
+ - SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer to test_sftp_operator.py.py for usage info.
- No updates are required if you are using ftpHook, it will continue to work as is.
### S3Hook switched to use Boto3
The airflow.hooks.S3_hook.S3Hook has been switched to use boto3 instead of the older boto (a.k.a. boto2). This results in a few backwards incompatible changes to the following classes: S3Hook:
- the constructors no longer accepts `s3_conn_id`. It is now called `aws_conn_id`.
- - the default conneciton is now "aws_default" instead of "s3_default"
+ - the default connection is now "aws_default" instead of "s3_default"
- the return type of objects returned by `get_bucket` is now boto3.s3.Bucket
- the return type of `get_key`, and `get_wildcard_key` is now an boto3.S3.Object.
@@ -106,7 +106,7 @@ Once a logger has determined that a message needs to be processed, it is passed
#### Changes in Airflow Logging
-Airflow's logging mechanism has been refactored to uses Python’s builtin `logging` module to perform logging of the application. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. Also the `BaseHook` and `BaseOperator` already extends this class, so it is easily available to do logging.
+Airflow's logging mechanism has been refactored to use Python’s builtin `logging` module to perform logging of the application. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. Also the `BaseHook` and `BaseOperator` already extend this class, so it is easily available to do logging.
The main benefit is easier configuration of the logging by setting a single centralized python file. Disclaimer; there is still some inline configuration, but this will be removed eventually. The new logging class is defined by setting the dotted classpath in your `~/airflow/airflow.cfg` file:
@@ -153,7 +153,7 @@ The `file_task_handler` logger has been made more flexible. The default format c
If you are logging to Google cloud storage, please see the [Google cloud platform documentation](https://airflow.incubator.apache.org/integration.html#gcp-google-cloud-platform) for logging instructions.
If you are using S3, the instructions should be largely the same as the Google cloud platform instructions above. You will need a custom logging config. The `REMOTE_BASE_LOG_FOLDER` configuration key in your airflow config has been removed, therefore you will need to take the following steps:
- - Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py) and copy it.
+ - Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py).
- Place it in a directory inside the Python import path `PYTHONPATH`. If you are using Python 2.7, ensuring that any `__init__.py` files exist so that it is importable.
- Update the config by setting the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
- Set the `logging_config_class` to the filename and dict. For example, if you place `custom_logging_config.py` on the base of your pythonpath, you will need to set `logging_config_class = custom_logging_config.LOGGING_CONFIG` in your config as Airflow 1.8.
@@ -180,12 +180,12 @@ supported and will be removed entirely in Airflow 2.0
Note that JSON serialization is stricter than pickling, so if you want to e.g. pass
raw bytes through XCom you must encode them using an encoding like base64.
By default pickling is still enabled until Airflow 2.0. To disable it
- Set enable_xcom_pickling = False in your Airflow config.
+ set enable_xcom_pickling = False in your Airflow config.
## Airflow 1.8.1
The Airflow package name was changed from `airflow` to `apache-airflow` during this release. You must uninstall
-previously installed version of Airflow before installing 1.8.1.
+a previously installed version of Airflow before installing 1.8.1.
## Airflow 1.8
@@ -202,12 +202,12 @@ Systemd unit files have been updated. If you use systemd please make sure to upd
Airflow 1.7.1 has issues with being able to over subscribe to a pool, ie. more slots could be used than were
available. This is fixed in Airflow 1.8.0, but due to past issue jobs may fail to start although their
dependencies are met after an upgrade. To workaround either temporarily increase the amount of slots above
-the the amount of queued tasks or use a new pool.
+the amount of queued tasks or use a new pool.
### Less forgiving scheduler on dynamic start_date
Using a dynamic start_date (e.g. `start_date = datetime.now()`) is not considered a best practice. The 1.8.0 scheduler
is less forgiving in this area. If you encounter DAGs not being scheduled you can try using a fixed start_date and
-renaming your dag. The last step is required to make sure you start with a clean slate, otherwise the old schedule can
+renaming your DAG. The last step is required to make sure you start with a clean slate, otherwise the old schedule can
interfere.
### New and updated scheduler options
@@ -243,7 +243,7 @@ By default the scheduler will fill any missing interval DAG Runs between the las
This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as
`catchup = False / True`. Command line backfills will still work.
-### Faulty Dags do not show an error in the Web UI
+### Faulty DAGs do not show an error in the Web UI
Due to changes in the way Airflow processes DAGs the Web UI does not show an error when processing a faulty DAG. To
find processing errors go the `child_process_log_directory` which defaults to `<AIRFLOW_HOME>/scheduler/latest`.
[23/50] incubator-airflow git commit: closes
apache/incubator-airflow#2970 *Closed for inactivity*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2970 *Closed for inactivity*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e29562e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e29562e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e29562e3
Branch: refs/heads/v1-10-test
Commit: e29562e33a05c66bc640c56143f3a2ca4e0a0b98
Parents: 06e90f4
Author: r39132 <si...@yahoo.com>
Authored: Sun Apr 29 20:20:30 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sun Apr 29 20:20:30 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[04/50] incubator-airflow git commit: closes
apache/incubator-airflow#1933 *Closed for inactivity*
Posted by fo...@apache.org.
closes apache/incubator-airflow#1933 *Closed for inactivity*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b9225215
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b9225215
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b9225215
Branch: refs/heads/v1-10-test
Commit: b9225215873251c061accd89349b82b99ad5d905
Parents: 414a08e
Author: r39132 <si...@yahoo.com>
Authored: Fri Apr 27 09:39:13 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Fri Apr 27 09:39:13 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[06/50] incubator-airflow git commit: closes
apache/incubator-airflow#2827 *Closed for inactivity*
Posted by fo...@apache.org.
closes apache/incubator-airflow#2827 *Closed for inactivity*
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67c0099e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67c0099e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67c0099e
Branch: refs/heads/v1-10-test
Commit: 67c0099e97ae5d679f838d404bbbc52cefdf5201
Parents: 172b73c
Author: r39132 <si...@yahoo.com>
Authored: Fri Apr 27 09:46:51 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Fri Apr 27 09:46:51 2018 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[02/50] incubator-airflow git commit: [AIRFLOW-2378] Add Groupon to
list of current users
Posted by fo...@apache.org.
[AIRFLOW-2378] Add Groupon to list of current users
Closes #3267 from stevencasey/add_groupon
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/801fe7db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/801fe7db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/801fe7db
Branch: refs/heads/v1-10-test
Commit: 801fe7dbdcc74988fe937e013d7f019df87c44e5
Parents: ae63246
Author: steven casey <26...@users.noreply.github.com>
Authored: Thu Apr 26 18:47:28 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Thu Apr 26 18:47:28 2018 -0700
----------------------------------------------------------------------
README.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/801fe7db/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 5e5193c..b504cbd 100644
--- a/README.md
+++ b/README.md
@@ -140,6 +140,7 @@ Currently **officially** using Airflow:
1. [GovTech GDS](https://gds-gov.tech) [[@chrissng](https://github.com/chrissng) & [@datagovsg](https://github.com/datagovsg)]
1. [Grand Rounds](https://www.grandrounds.com/) [[@richddr](https://github.com/richddr), [@timz1290](https://github.com/timz1290), [@wenever](https://github.com/@wenever), & [@runongirlrunon](https://github.com/runongirlrunon)]
1. [Groupalia](http://es.groupalia.com) [[@jesusfcr](https://github.com/jesusfcr)]
+1. [Groupon](https://groupon.com) [[@stevencasey](https://github.com/stevencasey)]
1. [Gusto](https://gusto.com) [[@frankhsu](https://github.com/frankhsu)]
1. [Handshake](https://joinhandshake.com/) [[@mhickman](https://github.com/mhickman)]
1. [Handy](http://www.handy.com/careers/73115?gh_jid=73115&gh_src=o5qcxn) [[@marcintustin](https://github.com/marcintustin) / [@mtustin-handy](https://github.com/mtustin-handy)]
[39/50] incubator-airflow git commit: [AIRFLOW-2394] default cmds and
arguments in kubernetes operator
Posted by fo...@apache.org.
[AIRFLOW-2394] default cmds and arguments in kubernetes operator
Commands aand arguments to docker image in kubernetes operator
Closes #3289 from ese/k8soperator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/12ab796b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/12ab796b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/12ab796b
Branch: refs/heads/v1-10-test
Commit: 12ab796b11c001f5cc7c5bd294616200b4159dea
Parents: 96d00da
Author: Sergio Ballesteros <sn...@locolandia.net>
Authored: Wed May 2 15:43:40 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed May 2 15:43:51 2018 +0200
----------------------------------------------------------------------
airflow/contrib/kubernetes/pod_generator.py | 6 ++---
.../operators/kubernetes_pod_operator.py | 23 ++++++++++-------
docs/kubernetes.rst | 26 +++++++++-----------
3 files changed, 28 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12ab796b/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index d75ba22..78d3926 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -26,11 +26,9 @@ class PodGenerator:
def __init__(self, kube_config=None):
self.kube_config = kube_config
- self.env_vars = {}
self.volumes = []
self.volume_mounts = []
self.init_containers = []
- self.secrets = []
def add_init_container(self,
name,
@@ -129,8 +127,8 @@ class PodGenerator:
cmds=cmds,
args=arguments,
labels=labels,
- envs=self.env_vars,
- secrets={},
+ envs={},
+ secrets=[],
# service_account_name=self.kube_config.worker_service_account_name,
# image_pull_secrets=self.kube_config.image_pull_secrets,
init_containers=worker_init_container_spec,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12ab796b/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 06c0c5a..32ad582 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -30,11 +30,13 @@ class KubernetesPodOperator(BaseOperator):
"""
Execute a task in a Kubernetes Pod
- :param image: Docker image name
+ :param image: Docker image you wish to launch. Defaults to dockerhub.io,
+ but fully qualified URLS will point to custom repositories
:type image: str
- :param: namespace: namespace name where run the Pod
+ :param: namespace: the namespace to run within kubernetes
:type: namespace: str
- :param cmds: entrypoint of the container
+ :param cmds: entrypoint of the container.
+ The docker images's entrypoint is used if this is not provide.
:type cmds: list
:param arguments: arguments of to the entrypoint.
The docker image's CMD is used if this is not provided.
@@ -43,15 +45,18 @@ class KubernetesPodOperator(BaseOperator):
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod
:type startup_timeout_seconds: int
- :param name: name for the pod
+ :param name: name of the task you want to run,
+ will be used to generate a pod id
:type name: str
:param env_vars: Environment variables initialized in the container
:type env_vars: dict
- :param secrets: Secrets to attach to the container
+ :param secrets: Kubernetes secrets to inject in the container,
+ They can be exposed as environment vars or files in a volume.
:type secrets: list
:param in_cluster: run kubernetes client with in_cluster configuration
:type in_cluster: bool
:param get_logs: get the stdout of the container as logs of the tasks
+ :type get_logs: bool
"""
def execute(self, context):
@@ -85,9 +90,9 @@ class KubernetesPodOperator(BaseOperator):
def __init__(self,
namespace,
image,
- cmds,
- arguments,
name,
+ cmds=None,
+ arguments=None,
env_vars=None,
secrets=None,
in_cluster=False,
@@ -99,8 +104,8 @@ class KubernetesPodOperator(BaseOperator):
super(KubernetesPodOperator, self).__init__(*args, **kwargs)
self.image = image
self.namespace = namespace
- self.cmds = cmds
- self.arguments = arguments
+ self.cmds = cmds or []
+ self.arguments = arguments or []
self.labels = labels or {}
self.startup_timeout_seconds = startup_timeout_seconds
self.name = name
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12ab796b/docs/kubernetes.rst
----------------------------------------------------------------------
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 1dd77cb..4b83fc0 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -1,5 +1,5 @@
Kubernetes Executor
-===================
+^^^^^^^^^^^^^^^^^^^
The kubernetes executor is introduced in Apache Airflow 1.10.0. The Kubernetes executor will create a new pod for every task instance.
@@ -9,31 +9,29 @@ Example helm charts are available at `scripts/ci/kubernetes/kube/{airflow,volume
Kubernetes Operator
-===================
+^^^^^^^^^^^^^^^^^^^
+.. code:: python
+ from airflow.contrib.operators import KubernetesOperator
+ from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+ from airflow.contrib.kubernetes.secret import Secret
-.. code:: python
+ secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
+ secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
- from airflow.comtrib.operators import KubernetesOperator
k = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
+ secrets=[secret_file,secret_env]
name="test",
task_id="task"
)
+.. autoclass:: airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator
+
+.. autoclass:: airflow.contrib.operators.secret.Secret
-================================= ====================================
-Variable Description
-================================= ====================================
-``@namespace`` The namespace is your isolated work environment within kubernetes
-``@image`` docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories
-``@cmds`` To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container
-``arguments`` arguments for your bash command
-``@labels`` Labels are an important element of launching kubernetes pods, as it tells kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label {'postgres':'foo'} and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service.
-``@name`` name of the task you want to run, will be used to generate a pod id
-================================= ====================================
[35/50] incubator-airflow git commit: [AIRFLOW-2400] Add Ability to
set Environment Variables for K8s
Posted by fo...@apache.org.
[AIRFLOW-2400] Add Ability to set Environment Variables for K8s
[AIRFLOW-2400] Env Variables for K8s
Allow environment variables to be set for the
KubernetesPodOperator.
Fix typo
Fix documentation variable type
Closes #3284 from jkao/add-env-vars-
to-k8s-operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fef7d304
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fef7d304
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fef7d304
Branch: refs/heads/v1-10-test
Commit: fef7d304d4aa2534e38da74921d482d4768991a6
Parents: 4428d1b
Author: Jeff Kao <je...@gmail.com>
Authored: Tue May 1 22:09:31 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Tue May 1 22:09:31 2018 +0200
----------------------------------------------------------------------
airflow/contrib/kubernetes/pod.py | 2 +-
airflow/contrib/operators/kubernetes_pod_operator.py | 6 +++++-
2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fef7d304/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index df60c8a..e5e9a4d 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -45,7 +45,7 @@ class Pod:
:param image: The docker image
:type image: str
:param envs: A dict containing the environment variables
- :type envs:s dict
+ :type envs: dict
:param cmds: The command to be run on the pod
:type cmds: list str
:param secrets: Secrets to be launched to the pod
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fef7d304/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 9e95d8b..06c0c5a 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -45,6 +45,8 @@ class KubernetesPodOperator(BaseOperator):
:type startup_timeout_seconds: int
:param name: name for the pod
:type name: str
+ :param env_vars: Environment variables initialized in the container
+ :type env_vars: dict
:param secrets: Secrets to attach to the container
:type secrets: list
:param in_cluster: run kubernetes client with in_cluster configuration
@@ -54,7 +56,6 @@ class KubernetesPodOperator(BaseOperator):
def execute(self, context):
try:
-
client = kube_client.get_kube_client(in_cluster=self.in_cluster)
gen = pod_generator.PodGenerator()
@@ -68,6 +69,7 @@ class KubernetesPodOperator(BaseOperator):
)
pod.secrets = self.secrets
+ pod.envs = self.env_vars
launcher = pod_launcher.PodLauncher(client)
final_state = launcher.run_pod(
@@ -86,6 +88,7 @@ class KubernetesPodOperator(BaseOperator):
cmds,
arguments,
name,
+ env_vars=None,
secrets=None,
in_cluster=False,
labels=None,
@@ -101,6 +104,7 @@ class KubernetesPodOperator(BaseOperator):
self.labels = labels or {}
self.startup_timeout_seconds = startup_timeout_seconds
self.name = name
+ self.env_vars = env_vars or {}
self.secrets = secrets or []
self.in_cluster = in_cluster
self.get_logs = get_logs
[45/50] incubator-airflow git commit: [AIRFLOW-2313] Add TTL
parameters for Dataproc
Posted by fo...@apache.org.
[AIRFLOW-2313] Add TTL parameters for Dataproc
Three additional optional parameters to
DataprocClusterCreateOperator
which configure Cloud Dataproc Cluster Scheduled
Deletion features.
Closes #3217 from ebartkus/dataproc-ttl
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b02820a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b02820a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b02820a7
Branch: refs/heads/v1-10-test
Commit: b02820a7afcb5205df39c4a639f1ceeb2c9c75ee
Parents: c5fa8cd
Author: ebartkus <eb...@trustpilot.com>
Authored: Thu May 3 23:10:49 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Thu May 3 23:10:49 2018 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/gcp_dataproc_hook.py | 2 +-
airflow/contrib/operators/dataproc_operator.py | 30 ++++++++-
.../contrib/operators/test_dataproc_operator.py | 67 ++++++++++++++++++--
3 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 7d95897..7849e17 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -197,7 +197,7 @@ class DataProcHook(GoogleCloudBaseHook):
def __init__(self,
gcp_conn_id='google_cloud_default',
delegate_to=None,
- api_version='v1'):
+ api_version='v1beta2'):
super(DataProcHook, self).__init__(gcp_conn_id, delegate_to)
self.api_version = api_version
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index ad0aa09..1614720 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -32,6 +32,7 @@ from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
from googleapiclient.errors import HttpError
+from airflow.utils import timezone
class DataprocClusterCreateOperator(BaseOperator):
@@ -105,6 +106,16 @@ class DataprocClusterCreateOperator(BaseOperator):
:type service_account: string
:param service_account_scopes: The URIs of service account scopes to be included.
:type service_account_scopes: list[string]
+ :param idle_delete_ttl: The longest duration that cluster would keep alive while
+ staying idle. Passing this threshold will cause cluster to be auto-deleted.
+ A duration in seconds.
+ :type idle_delete_ttl: int
+ :param auto_delete_time: The time when cluster will be auto-deleted.
+ :type auto_delete_time: datetime
+ :param auto_delete_ttl: The life duration of cluster, the cluster will be
+ auto-deleted at the end of this duration.
+ A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
+ :type auto_delete_ttl: int
"""
template_fields = ['cluster_name', 'project_id', 'zone', 'region']
@@ -135,6 +146,9 @@ class DataprocClusterCreateOperator(BaseOperator):
delegate_to=None,
service_account=None,
service_account_scopes=None,
+ idle_delete_ttl=None,
+ auto_delete_time=None,
+ auto_delete_ttl=None,
*args,
**kwargs):
@@ -163,6 +177,9 @@ class DataprocClusterCreateOperator(BaseOperator):
self.region = region
self.service_account = service_account
self.service_account_scopes = service_account_scopes
+ self.idle_delete_ttl = idle_delete_ttl
+ self.auto_delete_time = auto_delete_time
+ self.auto_delete_ttl = auto_delete_ttl
def _get_cluster_list_for_project(self, service):
result = service.projects().regions().clusters().list(
@@ -261,7 +278,8 @@ class DataprocClusterCreateOperator(BaseOperator):
}
},
'secondaryWorkerConfig': {},
- 'softwareConfig': {}
+ 'softwareConfig': {},
+ 'lifecycleConfig': {}
}
}
if self.num_preemptible_workers > 0:
@@ -294,6 +312,16 @@ class DataprocClusterCreateOperator(BaseOperator):
cluster_data['config']['softwareConfig']['imageVersion'] = self.image_version
if self.properties:
cluster_data['config']['softwareConfig']['properties'] = self.properties
+ if self.idle_delete_ttl:
+ cluster_data['config']['lifecycleConfig']['idleDeleteTtl'] = \
+ "{}s".format(self.idle_delete_ttl)
+ if self.auto_delete_time:
+ utc_auto_delete_time = timezone.convert_to_utc(self.auto_delete_time)
+ cluster_data['config']['lifecycleConfig']['autoDeleteTime'] = \
+ utc_auto_delete_time.format('%Y-%m-%dT%H:%M:%S.%fZ', formatter='classic')
+ elif self.auto_delete_ttl:
+ cluster_data['config']['lifecycleConfig']['autoDeleteTtl'] = \
+ "{}s".format(self.auto_delete_ttl)
if self.init_actions_uris:
init_actions_dict = [
{
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index e8cd1e5..d039cf1 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -69,6 +69,9 @@ SERVICE_ACCOUNT_SCOPES = [
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/bigtable.data'
]
+IDLE_DELETE_TTL = 321
+AUTO_DELETE_TIME = datetime.datetime(2017, 6, 7)
+AUTO_DELETE_TTL = 654
DEFAULT_DATE = datetime.datetime(2017, 6, 6)
REGION = 'test-region'
MAIN_URI = 'test-uri'
@@ -102,8 +105,11 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
worker_machine_type=WORKER_MACHINE_TYPE,
worker_disk_size=WORKER_DISK_SIZE,
num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
- labels = deepcopy(labels),
- service_account_scopes = SERVICE_ACCOUNT_SCOPES
+ labels=deepcopy(labels),
+ service_account_scopes=SERVICE_ACCOUNT_SCOPES,
+ idle_delete_ttl=IDLE_DELETE_TTL,
+ auto_delete_time=AUTO_DELETE_TIME,
+ auto_delete_ttl=AUTO_DELETE_TTL
)
)
self.dag = DAG(
@@ -136,6 +142,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
self.assertEqual(dataproc_operator.labels, self.labels[suffix])
self.assertEqual(dataproc_operator.service_account_scopes,
SERVICE_ACCOUNT_SCOPES)
+ self.assertEqual(dataproc_operator.idle_delete_ttl, IDLE_DELETE_TTL)
+ self.assertEqual(dataproc_operator.auto_delete_time, AUTO_DELETE_TIME)
+ self.assertEqual(dataproc_operator.auto_delete_ttl, AUTO_DELETE_TTL)
def test_get_init_action_timeout(self):
for suffix, dataproc_operator in enumerate(self.dataproc_operators):
@@ -160,6 +169,10 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
NETWORK_URI)
self.assertEqual(cluster_data['config']['gceClusterConfig']['tags'],
TAGS)
+ self.assertEqual(cluster_data['config']['lifecycleConfig']['idleDeleteTtl'],
+ "321s")
+ self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
+ "2017-06-07T00:00:00.000000Z")
# test whether the default airflow-version label has been properly
# set to the dataproc operator.
merged_labels = {}
@@ -169,6 +182,52 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
cluster_data['labels']['airflow-version']))
self.assertEqual(cluster_data['labels'], merged_labels)
+ def test_build_cluster_data_with_autoDeleteTime(self):
+ dataproc_operator = DataprocClusterCreateOperator(
+ task_id=TASK_ID,
+ cluster_name=CLUSTER_NAME,
+ project_id=PROJECT_ID,
+ num_workers=NUM_WORKERS,
+ zone=ZONE,
+ dag=self.dag,
+ auto_delete_time=AUTO_DELETE_TIME,
+ )
+ cluster_data = dataproc_operator._build_cluster_data()
+ self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
+ "2017-06-07T00:00:00.000000Z")
+
+ def test_build_cluster_data_with_autoDeleteTtl(self):
+ dataproc_operator = DataprocClusterCreateOperator(
+ task_id=TASK_ID,
+ cluster_name=CLUSTER_NAME,
+ project_id=PROJECT_ID,
+ num_workers=NUM_WORKERS,
+ zone=ZONE,
+ dag=self.dag,
+ auto_delete_ttl=AUTO_DELETE_TTL,
+ )
+ cluster_data = dataproc_operator._build_cluster_data()
+ self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTtl'],
+ "654s")
+
+ def test_build_cluster_data_with_autoDeleteTime_and_autoDeleteTtl(self):
+ dataproc_operator = DataprocClusterCreateOperator(
+ task_id=TASK_ID,
+ cluster_name=CLUSTER_NAME,
+ project_id=PROJECT_ID,
+ num_workers=NUM_WORKERS,
+ zone=ZONE,
+ dag=self.dag,
+ auto_delete_time=AUTO_DELETE_TIME,
+ auto_delete_ttl=AUTO_DELETE_TTL,
+ )
+ cluster_data = dataproc_operator._build_cluster_data()
+ if 'autoDeleteTtl' in cluster_data['config']['lifecycleConfig']:
+ self.fail("If 'auto_delete_time' and 'auto_delete_ttl' is set, " +
+ "only `auto_delete_time` is used")
+ self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
+ "2017-06-07T00:00:00.000000Z")
+
def test_cluster_name_log_no_sub(self):
with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
mock_hook.return_value.get_conn = self.mock_conn