You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xu...@apache.org on 2016/11/07 07:16:31 UTC

incubator-airflow git commit: [AIRFLOW-422] Add JSON endpoint for task info

Repository: incubator-airflow
Updated Branches:
  refs/heads/master fa977b60a -> 286c0478a


[AIRFLOW-422] Add JSON endpoint for task info

Expose task information in an API endpoint for
usage in other tools.
This API is located in the experimental endpoints.

Dear Airflow Maintainers,

Please accept this PR that addresses the following
issues:
https://issues.apache.org/jira/browse/AIRFLOW-422

Testing Done:
- Added a simple unit test.

Expose task information in a web endpoint for
usage in other tools.

Closes #1740 from saguziel/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/286c0478
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/286c0478
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/286c0478

Branch: refs/heads/master
Commit: 286c0478a6930596e4c4ac15bd7bb57f50c3fef4
Parents: fa977b6
Author: Alex Guziel <al...@airbnb.com>
Authored: Mon Nov 7 02:15:57 2016 -0500
Committer: Li Xuanji <xu...@gmail.com>
Committed: Mon Nov 7 02:15:57 2016 -0500

----------------------------------------------------------------------
 airflow/www/api/__init__.py               | 14 +++++
 airflow/www/api/experimental/__init__.py  | 14 +++++
 airflow/www/api/experimental/endpoints.py | 40 +++++++++++++
 airflow/www/app.py                        |  7 ++-
 tests/www/__init__.py                     | 14 +++++
 tests/www/api/__init__.py                 | 14 +++++
 tests/www/api/experimental/__init__.py    | 14 +++++
 tests/www/api/experimental/endpoints.py   | 83 ++++++++++++++++++++++++++
 8 files changed, 198 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/286c0478/airflow/www/api/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/__init__.py b/airflow/www/api/__init__.py
new file mode 100644
index 0000000..759b563
--- /dev/null
+++ b/airflow/www/api/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/286c0478/airflow/www/api/experimental/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/__init__.py b/airflow/www/api/experimental/__init__.py
new file mode 100644
index 0000000..759b563
--- /dev/null
+++ b/airflow/www/api/experimental/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/286c0478/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
new file mode 100644
index 0000000..d70e67b
--- /dev/null
+++ b/airflow/www/api/experimental/endpoints.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from airflow.www.views import dagbag
+
+from flask import Blueprint, jsonify
+
+api_experimental = Blueprint('api_experimental', __name__)
+
+
+@api_experimental.route('/dags/<string:dag_id>/tasks/<string:task_id>', methods=['GET'])
+def task_info(dag_id, task_id):
+    """Returns a JSON with a task's public instance variables. """
+    if dag_id not in dagbag.dags:
+        response = jsonify({'error': 'Dag {} not found'.format(dag_id)})
+        response.status_code = 404
+        return response
+
+    dag = dagbag.dags[dag_id]
+    if not dag.has_task(task_id):
+        response = (jsonify({'error': 'Task {} not found in dag {}'
+                    .format(task_id, dag_id)}))
+        response.status_code = 404
+        return response
+
+    task = dag.get_task(task_id)
+    fields = {k: str(v) for k, v in vars(task).items() if not k.startswith('_')}
+    return jsonify(fields)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/286c0478/airflow/www/app.py
----------------------------------------------------------------------
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 6ab4ffd..43c6314 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -43,11 +43,11 @@ def create_app(config=None):
     airflow.load_login()
     airflow.login.login_manager.init_app(app)
 
+    app.register_blueprint(routes)
+
     cache = Cache(
         app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'})
 
-    app.register_blueprint(routes)
-
     log_format = airflow.settings.LOG_FORMAT_WITH_PID
     airflow.settings.configure_logging(log_format=log_format)
 
@@ -123,6 +123,9 @@ def create_app(config=None):
 
         integrate_plugins()
 
+        from airflow.www.api.experimental.endpoints import api_experimental
+        app.register_blueprint(api_experimental, url_prefix='/api/experimental')
+
         @app.context_processor
         def jinja_globals():
             return {

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/286c0478/tests/www/__init__.py
----------------------------------------------------------------------
diff --git a/tests/www/__init__.py b/tests/www/__init__.py
new file mode 100644
index 0000000..c82f579
--- /dev/null
+++ b/tests/www/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/286c0478/tests/www/api/__init__.py
----------------------------------------------------------------------
diff --git a/tests/www/api/__init__.py b/tests/www/api/__init__.py
new file mode 100644
index 0000000..c82f579
--- /dev/null
+++ b/tests/www/api/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/286c0478/tests/www/api/experimental/__init__.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/__init__.py b/tests/www/api/experimental/__init__.py
new file mode 100644
index 0000000..c82f579
--- /dev/null
+++ b/tests/www/api/experimental/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/286c0478/tests/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/endpoints.py b/tests/www/api/experimental/endpoints.py
new file mode 100644
index 0000000..32da137
--- /dev/null
+++ b/tests/www/api/experimental/endpoints.py
@@ -0,0 +1,83 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import os
+import unittest
+from datetime import datetime, time, timedelta
+
+from airflow import configuration
+
+configuration.load_test_config()
+from airflow import models, settings
+from airflow.www import app as application
+from airflow.settings import Session
+
+NUM_EXAMPLE_DAGS = 16
+DEV_NULL = '/dev/null'
+TEST_DAG_FOLDER = os.path.join(
+    os.path.dirname(os.path.realpath(__file__)), 'dags')
+DEFAULT_DATE = datetime(2015, 1, 1)
+DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
+DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
+TEST_DAG_ID = 'unit_tests'
+
+
+try:
+    import cPickle as pickle
+except ImportError:
+    # Python 3
+    import pickle
+
+
+def reset(dag_id=TEST_DAG_ID):
+    session = Session()
+    tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+    tis.delete()
+    session.commit()
+    session.close()
+
+
+class ApiExperimentalTests(unittest.TestCase):
+    def setUp(self):
+        reset()
+        configuration.load_test_config()
+        app = application.create_app()
+        app.config['TESTING'] = True
+        self.app = app.test_client()
+
+        self.dagbag = models.DagBag(
+            dag_folder=DEV_NULL, include_examples=True)
+        self.dag_bash = self.dagbag.dags['example_bash_operator']
+        self.runme_0 = self.dag_bash.get_task('runme_0')
+
+    def test_task_info(self):
+        url_template = '/api/experimental/dags/{}/tasks/{}'
+
+        response = self.app.get(url_template.format('example_bash_operator', 'runme_0'))
+        assert '"email"' in response.data.decode('utf-8')
+        assert 'error' not in response.data.decode('utf-8')
+        self.assertEqual(200, response.status_code)
+       
+        response = self.app.get(url_template.format('example_bash_operator', 'DNE'))
+        assert 'error' in response.data.decode('utf-8')
+        self.assertEqual(404, response.status_code)
+
+        response = self.app.get(url_template.format('DNE', 'DNE'))
+        assert 'error' in response.data.decode('utf-8')
+        self.assertEqual(404, response.status_code)
+
+    def tearDown(self):
+        self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=datetime.now())