You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/22 18:30:46 UTC

[airflow] branch v1-10-test updated (d65053c -> 92a1040)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from d65053c  [AIRFLOW-4357] Fix SVG tooltip positioning with custom scripting (#8269)
     new b4f7360  Use existing DagBag for 'dag_details' & `trigger` Endpoints (#8501)
     new 8405787  Make hive macros py3 compatible (#8598)
     new 3127b0a  Enhanced documentation around Cluster Policy (#8661)
     new 731ee51  Improve tutorial - Include all imports statements (#8670)
     new 92a1040  Fix docs on creating CustomOperator (#8678)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/example_dags/tutorial.py |  2 +-
 airflow/hooks/hive_hooks.py      |  3 ++-
 airflow/www_rbac/views.py        |  4 +---
 docs/concepts.rst                |  7 +++++++
 docs/howto/custom-operator.rst   |  6 +++---
 tests/hooks/test_hive_hook.py    | 15 ++++++++++++---
 tests/www_rbac/test_views.py     | 25 +++++++++++++++++++++++++
 7 files changed, 51 insertions(+), 11 deletions(-)


[airflow] 05/05: Fix docs on creating CustomOperator (#8678)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 92a1040ce152118997fe75231acc4069e3668b48
Author: Jonny Fuller <mr...@gmail.com>
AuthorDate: Sat May 9 20:33:45 2020 -0400

    Fix docs on creating CustomOperator (#8678)
    
    (cherry picked from commit 5e1c33a1baf0725eeb695a96b29ddd9585df51e4)
---
 docs/howto/custom-operator.rst | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/howto/custom-operator.rst b/docs/howto/custom-operator.rst
index 7713468..a9733d2 100644
--- a/docs/howto/custom-operator.rst
+++ b/docs/howto/custom-operator.rst
@@ -148,7 +148,7 @@ the operator.
                 self.name = name
 
             def execute(self, context):
-                message = "Hello from {}".format(name)
+                message = "Hello from {}".format(self.name)
                 print(message)
                 return message
 
@@ -157,9 +157,9 @@ You can use the template as follows:
 .. code:: python
 
         with dag:
-            hello_task = HelloOperator(task_id='task_id_1', dag=dag, name='{{ task_id }}')
+            hello_task = HelloOperator(task_id='task_id_1', dag=dag, name='{{ task_instance.task_id }}')
 
-In this example, Jinja looks for the ``name`` parameter and substitutes ``{{ task_id }}`` with
+In this example, Jinja looks for the ``name`` parameter and substitutes ``{{ task_instance.task_id }}`` with
 ``task_id_1``.
 
 


[airflow] 04/05: Improve tutorial - Include all imports statements (#8670)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 731ee51ae1ba6a8ab51f3ac861ab2b9b73bcb094
Author: Lyalpha <Ly...@users.noreply.github.com>
AuthorDate: Sun May 3 17:25:39 2020 +0100

    Improve tutorial - Include all imports statements (#8670)
    
    (cherry picked from commit 62796b9e0154daf38de72ebca36e3175001fbf03)
---
 airflow/example_dags/tutorial.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index 18685d1..8858452 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -23,9 +23,9 @@ Documentation that goes along with the Airflow tutorial located
 [here](https://airflow.apache.org/tutorial.html)
 """
 # [START tutorial]
+# [START import_module]
 from datetime import timedelta
 
-# [START import_module]
 # The DAG object; we'll need this to instantiate a DAG
 from airflow import DAG
 # Operators; we need this to operate!


[airflow] 02/05: Make hive macros py3 compatible (#8598)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8405787b6915d67be09ed3a0b18bacb2d130d03e
Author: Ace Haidrey <ah...@pandora.com>
AuthorDate: Tue Jun 16 13:19:41 2020 -0700

    Make hive macros py3 compatible (#8598)
    
    Co-authored-by: Ace Haidrey <ah...@pinterest.com>
    
    (cherry-picked from c78e2a5)
---
 airflow/hooks/hive_hooks.py   |  3 ++-
 tests/hooks/test_hive_hook.py | 15 ++++++++++++---
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 4e41834..ec37dfb 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -691,6 +691,7 @@ class HiveMetastoreHook(BaseHook):
                            pairs will be considered as candidates of max partition.
         :type filter_map: map
         :return: Max partition or None if part_specs is empty.
+        :rtype: basestring
         """
         if not part_specs:
             return None
@@ -714,7 +715,7 @@ class HiveMetastoreHook(BaseHook):
         if not candidates:
             return None
         else:
-            return max(candidates).encode('utf-8')
+            return max(candidates)
 
     def max_partition(self, schema, table_name, field=None, filter_map=None):
         """
diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py
index 98c024f..011038a 100644
--- a/tests/hooks/test_hive_hook.py
+++ b/tests/hooks/test_hive_hook.py
@@ -362,7 +362,7 @@ class TestHiveMetastoreHook(TestHiveEnvironment):
                 None)
 
         # No partition will be filtered out.
-        self.assertEqual(max_partition, b'value3')
+        self.assertEqual(max_partition, 'value3')
 
     def test_get_max_partition_from_valid_part_specs(self):
         max_partition = \
@@ -371,7 +371,16 @@ class TestHiveMetastoreHook(TestHiveEnvironment):
                  {'key1': 'value3', 'key2': 'value4'}],
                 'key1',
                 self.VALID_FILTER_MAP)
-        self.assertEqual(max_partition, b'value1')
+        self.assertEqual(max_partition, 'value1')
+
+    def test_get_max_partition_from_valid_part_specs_return_type(self):
+        max_partition = \
+            HiveMetastoreHook._get_max_partition_from_part_specs(
+                [{'key1': 'value1', 'key2': 'value2'},
+                 {'key1': 'value3', 'key2': 'value4'}],
+                'key1',
+                self.VALID_FILTER_MAP)
+        self.assertIsInstance(max_partition, str)
 
     @patch("airflow.hooks.hive_hooks.HiveMetastoreHook.get_connection",
            return_value=[Connection(host="localhost", port="9802")])
@@ -522,7 +531,7 @@ class TestHiveMetastoreHook(TestHiveEnvironment):
                                             table_name=self.table,
                                             field=self.partition_by,
                                             filter_map=filter_map)
-        self.assertEqual(partition, DEFAULT_DATE_DS.encode('utf-8'))
+        self.assertEqual(partition, DEFAULT_DATE_DS)
 
         metastore.get_table.assert_called_with(
             dbname=self.database, tbl_name=self.table)


[airflow] 03/05: Enhanced documentation around Cluster Policy (#8661)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3127b0a071c66ec0d53adaef055c9cf4bdcd1fe4
Author: Vardan Gupta <va...@gmail.com>
AuthorDate: Sat May 2 01:49:53 2020 +0530

    Enhanced documentation around Cluster Policy (#8661)
    
    (cherry picked from commit 6560f29fa206fe1fcc99d0ee4093d678caf74511)
---
 docs/concepts.rst | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/docs/concepts.rst b/docs/concepts.rst
index 068321e..89479d4 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -1017,6 +1017,13 @@ may look like inside your ``airflow_local_settings.py``:
         if task.timeout > timedelta(hours=48):
             task.timeout = timedelta(hours=48)
 
+To define policy, add a ``airflow_local_settings`` module to your PYTHONPATH
+or to AIRFLOW_HOME/config folder that defines this ``policy`` function. It receives a ``TaskInstance``
+object and can alter it where needed.
+
+Please note, cluster policy currently applies to task only though you can access DAG via ``task.dag`` property.
+Also, cluster policy will have precedence over task attributes defined in DAG
+meaning if ``task.sla`` is defined in dag and also mutated via cluster policy then later will have precedence.
 
 Documentation & Notes
 =====================


[airflow] 01/05: Use existing DagBag for 'dag_details' & `trigger` Endpoints (#8501)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b4f7360eb4a0c10caa391f9921fc94570605e245
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Apr 21 22:24:58 2020 +0100

    Use existing DagBag for 'dag_details' & `trigger` Endpoints (#8501)
---
 airflow/www_rbac/views.py    |  4 +---
 tests/www_rbac/test_views.py | 25 +++++++++++++++++++++++++
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index bf33f2b..e78b61b 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -562,9 +562,7 @@ class Airflow(AirflowBaseView):
     @provide_session
     def dag_details(self, session=None):
         dag_id = request.args.get('dag_id')
-        dag_orm = DagModel.get_dagmodel(dag_id, session=session)
-        # FIXME: items needed for this view should move to the database
-        dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
+        dag = dagbag.get_dag(dag_id)
         title = "DAG details"
         root = request.args.get('root', '')
 
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index b705cad..68a605a 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -565,6 +565,19 @@ class TestAirflowBaseViews(TestBase):
         resp = self.client.get(url, follow_redirects=True)
         self.check_content_in_response('DAG details', resp)
 
+    @parameterized.expand(["graph", "tree", "dag_details"])
+    @mock.patch('airflow.www_rbac.views.dagbag.get_dag')
+    def test_view_uses_existing_dagbag(self, endpoint, mock_get_dag):
+        """
+        Test that Graph, Tree & Dag Details View uses the DagBag already created in views.py
+        instead of creating a new one.
+        """
+        mock_get_dag.return_value = DAG(dag_id='example_bash_operator')
+        url = '{}?dag_id=example_bash_operator'.format(endpoint)
+        resp = self.client.get(url, follow_redirects=True)
+        mock_get_dag.assert_called_once_with('example_bash_operator')
+        self.check_content_in_response('example_bash_operator', resp)
+
     def test_dag_details_trigger_origin_tree_view(self):
         dag = self.dagbag.dags['test_tree_view']
         dag.create_dagrun(
@@ -2207,6 +2220,18 @@ class TestTriggerDag(TestBase):
         self.check_content_in_response(
             'Triggered example_bash_operator, it should start any moment now.', response)
 
+    @mock.patch('airflow.www_rbac.views.dagbag.get_dag')
+    def test_trigger_endpoint_uses_existing_dagbag(self, mock_get_dag):
+        """
+        Test that Trigger Endpoint uses the DagBag already created in views.py
+        instead of creating a new one.
+        """
+        mock_get_dag.return_value = DAG(dag_id='example_bash_operator')
+        url = 'trigger?dag_id=example_bash_operator'
+        resp = self.client.post(url, data={}, follow_redirects=True)
+        mock_get_dag.assert_called_once_with('example_bash_operator')
+        self.check_content_in_response('example_bash_operator', resp)
+
 
 class TestExtraLinks(TestBase):
     def setUp(self):