You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/17 23:52:50 UTC

[GitHub] stale[bot] closed pull request #2657: [AIRFLOW-161] New redirect route and extra links

stale[bot] closed pull request #2657: [AIRFLOW-161] New redirect route and extra links
URL: https://github.com/apache/incubator-airflow/pull/2657
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 1dfb079b49..4a80053502 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -251,6 +251,9 @@ hide_paused_dags_by_default = False
 # Consistent page size across all listing views in the UI
 page_size = 100
 
+# List of domains which are allowed to get redirected to by operators
+whitelisted_domains = []
+
 [email]
 email_backend = airflow.utils.email.send_email_smtp
 
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index b065313c17..c3b6d2e717 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -59,6 +59,7 @@ dag_default_view = tree
 log_fetch_timeout_sec = 5
 hide_paused_dags_by_default = False
 page_size = 100
+whitelisted_domains = []
 
 [email]
 email_backend = airflow.utils.email.send_email_smtp
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index f3bcc202ed..712d782eb9 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -17,12 +17,15 @@
 import time
 import datetime
 import six
+import re
 
+from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow import configuration
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
+from airflow.models import TaskInstance
 
 from qds_sdk.qubole import Qubole
 from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand, \
@@ -175,6 +178,29 @@ def get_jobs_id(self, ti):
             cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id)
         Command.get_jobs_id(self.cls, cmd_id)
 
+    def get_redirect_url(self, task, dttm):
+        session = settings.Session()
+        url = ''
+
+        try:
+            conn = BaseHook.get_connection(task.kwargs['qubole_conn_id'])
+            if conn and conn.host:
+                host = re.sub(r'api$', 'v2/analyze?command_id=', conn.host)
+            else:
+                host = 'https://api.qubole.com/v2/analyze?command_id='
+
+            ti = TaskInstance(task=task, execution_date=dttm)
+            qds_command_id = ti.xcom_pull(task_ids=task.task_id, key='qbol_cmd_id')
+
+            url = host + str(qds_command_id) if qds_command_id else ''
+        except Exception as e:
+            print('Could not find the url to redirect. Error: %s' % str(e))
+        finally:
+            session.commit()
+            session.close()
+
+        return url
+
     def create_cmd_args(self, context):
         args = []
         cmd_type = self.kwargs['command_type']
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index a5e9f5ed63..5d99911fcf 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -122,6 +122,7 @@ class QuboleOperator(BaseOperator):
     template_ext = ('.txt',)
     ui_color = '#3064A1'
     ui_fgcolor = '#fff'
+    extra_links = ['Go to QDS']
 
     @apply_defaults
     def __init__(self, qubole_conn_id="qubole_default", *args, **kwargs):
@@ -155,6 +156,9 @@ def get_hook(self):
         # Reinitiating the hook, as some template fields might have changed
         return QuboleHook(*self.args, **self.kwargs)
 
+    def get_redirect_url(self, dttm, redirect_to):
+        return self.get_hook().get_redirect_url(self, dttm)
+
     def __getattribute__(self, name):
         if name in QuboleOperator.template_fields:
             if name in self.kwargs:
diff --git a/airflow/models.py b/airflow/models.py
index 5837363bd9..a419d12918 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2094,6 +2094,8 @@ class derived from this one results in the creation of a task object,
     template_fields = []
     # Defines which files extensions to look for in the templated fields
     template_ext = []
+    # Defines the extra buttons to display in the task instance model view
+    extra_links = []
     # Defines the color in the UI
     ui_color = '#fff'
     ui_fgcolor = '#000'
@@ -2747,6 +2749,9 @@ def xcom_pull(
             dag_id=dag_id,
             include_prior_dates=include_prior_dates)
 
+    def get_redirect_url(self, dttm, redirect_to):
+        pass
+
 
 class DagModel(Base):
 
diff --git a/airflow/www/static/gantt-chart-d3v2.js b/airflow/www/static/gantt-chart-d3v2.js
index d21311a1c5..f3b400d621 100644
--- a/airflow/www/static/gantt-chart-d3v2.js
+++ b/airflow/www/static/gantt-chart-d3v2.js
@@ -126,7 +126,7 @@ d3.gantt = function() {
     .on('mouseover', tip.show)
     .on('mouseout', tip.hide)
     .on('click', function(d) {
-      call_modal(d.taskName, d.executionDate);
+      call_modal(d.taskName, d.executionDate, d.taskType, d.extraLinks);
     })
     .attr("class", function(d){
       if(taskStatus[d.status] == null){ return "bar";}
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index d5a145dfb0..c04e5790b2 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -207,6 +207,9 @@ <h4 class="modal-title" id="myModalLabel">
               Downstream
             </button>
           </span>
+          <hr/>
+          <span class="btn-group" id="extra_links">
+          </span>
         </div>
         <div class="modal-footer">
           <button type="button" class="btn btn-default" data-dismiss="modal">
@@ -274,24 +277,46 @@ <h4 class="modal-title" id="dagModalLabel">
     var task_id = '';
     var exection_date = '';
     var subdag_id = '';
-    function call_modal(t, d, sd) {
+
+    function call_modal(t, d, task_type, extra_links) {
       task_id = t;
       loc = String(window.location);
       $("#btn_filter").on("click", function(){
         window.location = updateQueryStringParameter(loc, "root", task_id);
       });
-      subdag_id = sd;
       execution_date = d;
       $('#task_id').html(t);
       $('#execution_date').html(d);
       $('#myModal').modal({});
-      $("#myModal").css("margin-top","0px")
-        if (subdag_id===undefined)
-            $("#div_btn_subdag").hide();
-        else {
-            $("#div_btn_subdag").show();
-            subdag_id = "{{ dag.dag_id }}."+t;
-        }
+      $("#myModal").css("margin-top","0px");
+      $('#extra_links').prev('hr').hide();
+      $('#extra_links').empty().hide();
+      $("#div_btn_subdag").hide();
+
+      if (task_type==="SubDagOperator"){
+        $("#div_btn_subdag").show();
+        subdag_id = "{{ dag.dag_id }}."+t;
+      }
+
+      if (extra_links && extra_links.length > 0){
+        var markupArr = [];
+
+        $.each(extra_links, function(i, link){
+          var url = "{{ url_for('airflow.redirect') }}" +
+                  "?task_id=" + encodeURIComponent(task_id) +
+                  "&dag_id=" + encodeURIComponent(dag_id) +
+                  "&execution_date=" + encodeURIComponent(execution_date) +
+                  "&redirect_to=" + encodeURIComponent(link);
+
+          var a = '<a href="' + url + '" class="btn btn-primary" target="_blank">' + link + '</a>';
+
+          markupArr.push(a)
+        });
+
+        $('#extra_links').prev('hr').show();
+        $('#extra_links').append(markupArr.join('')).show();
+      }
+
     }
 
     function call_modal_dag(dag) {
diff --git a/airflow/www/templates/airflow/graph.html b/airflow/www/templates/airflow/graph.html
index 24fc508027..42f6330aca 100644
--- a/airflow/www/templates/airflow/graph.html
+++ b/airflow/www/templates/airflow/graph.html
@@ -124,10 +124,7 @@
 
     d3.selectAll("g.node").on("click", function(d){
         task = tasks[d];
-        if (task.task_type == "SubDagOperator")
-            call_modal(d, execution_date, true);
-        else
-            call_modal(d, execution_date);
+        call_modal(d, execution_date, task.task_type, task.extra_links);
     });
 
 
diff --git a/airflow/www/templates/airflow/tree.html b/airflow/www/templates/airflow/tree.html
index f20127ceb3..cf6bcbbc83 100644
--- a/airflow/www/templates/airflow/tree.html
+++ b/airflow/www/templates/airflow/tree.html
@@ -232,10 +232,8 @@
       .on("click", function(d){
         if(d.task_id === undefined)
             call_modal_dag(d);
-        else if(nodeobj[d.task_id].operator=='SubDagOperator')
-            call_modal(d.task_id, d.execution_date, true);
         else
-            call_modal(d.task_id, d.execution_date);
+            call_modal(d.task_id, d.execution_date, nodeobj[d.task_id].operator, nodeobj[d.task_id].extra_links);
       })
       .attr("class", function(d) {return "state " + d.state})
       .attr("data-toggle", "tooltip")
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 6bcb66d538..5250df4758 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -62,6 +62,11 @@
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
 
+try:
+    from urllib.parse import urlparse
+except ImportError:
+    from urlparse import urlparse
+
 import airflow
 from airflow import configuration as conf
 from airflow import models
@@ -1250,6 +1255,7 @@ def set_duration(tid):
                 'end_date': task.end_date,
                 'depends_on_past': task.depends_on_past,
                 'ui_color': task.ui_color,
+                'extra_links': task.extra_links,
             }
 
         data = {
@@ -1360,6 +1366,7 @@ class GraphForm(Form):
             t.task_id: {
                 'dag_id': t.dag_id,
                 'task_type': t.task_type,
+                'extra_links': t.extra_links,
             }
             for t in dag.tasks}
         if not tasks:
@@ -1708,6 +1715,12 @@ def gantt(self, session=None):
             if ti.start_date]
         tis = sorted(tis, key=lambda ti: ti.start_date)
 
+        task_types = {}
+        extra_links = {}
+        for t in dag.tasks:
+            task_types[t.task_id] = t.task_type
+            extra_links[t.task_id] = t.extra_links
+
         tasks = []
         for ti in tis:
             end_date = ti.end_date if ti.end_date else timezone.utcnow()
@@ -1717,11 +1730,15 @@ def gantt(self, session=None):
                 'isoStart': ti.start_date.isoformat()[:-4],
                 'isoEnd': end_date.isoformat()[:-4],
                 'taskName': ti.task_id,
+                'taskType': task_types[ti.task_id],
                 'duration': "{}".format(end_date - ti.start_date)[:-4],
                 'status': ti.state,
                 'executionDate': ti.execution_date.isoformat(),
+                'extraLinks': extra_links[ti.task_id],
             })
         states = {ti.state: ti.state for ti in tis}
+
+
         data = {
             'taskNames': [ti.task_id for ti in tis],
             'tasks': tasks,
@@ -1742,6 +1759,52 @@ def gantt(self, session=None):
             root=root,
         )
 
+    @expose('/redirect')
+    @login_required
+    @wwwutils.action_logging
+    def redirect(self):
+
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        redirect_to = request.args.get('redirect_to')
+        dttm = airflow.utils.timezone.parse(execution_date)
+        dag = dagbag.get_dag(dag_id)
+
+        if not dag or task_id not in dag.task_ids:
+            flash(
+                "Task [{}.{}] doesn't seem to exist"
+                " at the moment".format(dag_id, task_id),
+                "error")
+            return redirect(request.referrer or '/admin/')
+
+        task = None
+
+        for t in dag.tasks:
+            if t.task_id == task_id:
+                task = t
+                break
+
+        url = task.get_redirect_url(dttm, redirect_to)
+
+        if url:
+            allowed_domains = conf.get('webserver', 'whitelisted_domains')
+            parsed_uri = urlparse(url)
+            domain = '{uri.scheme}://{uri.netloc}/'.format(uri=parsed_uri)
+            if domain in allowed_domains:
+                return redirect(url)
+            else:
+                flash("Couldn't redirect to {}, as the domain {} is "
+                      "not whitelisted".format(url, domain))
+            return redirect(request.referrer or '/admin/')
+        else:
+            flash(
+                "Couldn't redirect to {} for [{}.{}]"
+                " at the moment".format(redirect_to, dag_id, task_id),
+                "error")
+            return redirect(request.referrer or '/admin/')
+
+
     @expose('/object/task_instances')
     @login_required
     @wwwutils.action_logging
diff --git a/tests/contrib/operators/test_qubole_operator.py b/tests/contrib/operators/test_qubole_operator.py
index 9b5bf3a7f4..1f7bb6ff15 100644
--- a/tests/contrib/operators/test_qubole_operator.py
+++ b/tests/contrib/operators/test_qubole_operator.py
@@ -14,9 +14,10 @@
 #
 
 import unittest
-from datetime import datetime
+from airflow.utils.timezone import datetime
 
-from airflow.models import DAG, Connection
+from airflow import settings
+from airflow.models import DAG, Connection, TaskInstance
 from airflow.utils import db
 
 from airflow.contrib.hooks.qubole_hook import QuboleHook
@@ -30,10 +31,11 @@
     except ImportError:
         mock = None
 
-DAG_ID="qubole_test_dag"
-TASK_ID="test_task"
-DEFAULT_CONN="qubole_default"
+DAG_ID = "qubole_test_dag"
+TASK_ID = "test_task"
+DEFAULT_CONN = "qubole_default"
 TEMPLATE_CONN = "my_conn_id"
+TEST_CONN = "qubole_test_conn"
 DEFAULT_DATE = datetime(2017, 1, 1)
 
 
@@ -41,6 +43,16 @@ class QuboleOperatorTest(unittest.TestCase):
     def setUp(self):
         db.merge_conn(
             Connection(conn_id=DEFAULT_CONN, conn_type='HTTP'))
+        db.merge_conn(
+            Connection(conn_id=TEST_CONN, conn_type='HTTP',
+                       host='http://localhost/api'))
+
+    def tearDown(self):
+        session = settings.Session()
+        session.query(Connection).filter(
+            Connection.conn_id == TEST_CONN).delete()
+        session.commit()
+        session.close()
 
     def test_init_with_default_connection(self):
         op = QuboleOperator(task_id=TASK_ID)
@@ -104,4 +116,23 @@ def test_position_args_parameters(self):
         self.assertEqual(task.get_hook().create_cmd_args({'run_id': 'dummy'})[5],
                          "s3n://airflow/destination_hadoopcmd")
 
+    def test_get_redirect_url(self):
+        dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
 
+        with dag:
+            task = QuboleOperator(task_id=TASK_ID,
+                                  qubole_conn_id=TEST_CONN,
+                                  command_type='shellcmd',
+                                  parameters="param1 param2",
+                                  dag=dag)
+
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti.xcom_push('qbol_cmd_id', 12345)
+
+        # check for positive case
+        url = task.get_redirect_url(DEFAULT_DATE, 'Go to QDS')
+        self.assertEqual(url, 'http://localhost/v2/analyze?command_id=12345')
+
+        # check for negative case
+        url2 = task.get_redirect_url('2017-09-01', 'Go to QDS')
+        self.assertEqual(url2, '')
diff --git a/tests/core.py b/tests/core.py
index a57f0ede60..6b1a96162d 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -2287,6 +2287,43 @@ def test_init_proxy_user(self):
         self.assertEqual('someone', c.proxy_user)
 
 
+class TestRedirect(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        self.ENDPOINT = '/admin/airflow/redirect'
+        self.DEFAULT_DATE = datetime(2017, 1, 1)
+        self.app = application.create_app().test_client()
+
+        class DummyTestOperator(BaseOperator):
+            extra_links = ['foo-bar']
+
+            def get_redirect_url(self, ddtm, redirect_to):
+                return 'www.example.com/{0}/{1}/{2}'.format(self.task_id,
+                                                            redirect_to, ddtm)
+
+        self.dag = DAG('dag', start_date=DEFAULT_DATE)
+        self.task = DummyTestOperator(task_id="some_dummy_task", dag=self.dag)
+
+    def test_redirect_method_not_whitelisted(self):
+        response = self.app.get(
+            "{0}?dag_id={1}&task_id={2}&execution_date={3}&redirect_to=foo-bar"
+            .format(self.ENDPOINT, self.dag.dag_id, self.task.task_id, self.DEFAULT_DATE),
+            follow_redirects=True)
+
+        self.assertEquals(response.status_code, 200)
+
+    def test_redirect_method_whitelisted(self):
+        configuration.set('webserver', 'whitelisted_domains', 'www.example.com')
+
+        response = self.app.get(
+            "{0}?dag_id={1}&task_id={2}&execution_date={3}&redirect_to=foo-bar"
+            .format(self.ENDPOINT, self.dag.dag_id, self.task.task_id, self.DEFAULT_DATE))
+
+        # Not sure how to check if the response is heading the request to
+        # 'www.example.com/some_dummy_task/foo-bar/2017-01-01') or not.
+        self.assertEquals(response.status_code, 302)
+
+
 try:
     from airflow.hooks.hdfs_hook import HDFSHook
     import snakebite
diff --git a/tests/models.py b/tests/models.py
index cabcf3a4f8..5bc42d34d4 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -32,6 +32,7 @@
 from airflow.models import DagModel, DagStat
 from airflow.models import clear_task_instances
 from airflow.models import XCom
+from airflow.operators import BaseOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
@@ -294,6 +295,29 @@ def jinja_udf(name):
         result = task.render_template('', "{{ 'world' | hello}}", dict())
         self.assertEqual(result, 'Hello world')
 
+    def test_extra_links_no_affect(self):
+        """
+        test for no affect on existing operators with no extra_links
+        """
+        task = DummyOperator(task_id="some_dummy_task")
+        self.assertEqual(task.extra_links, [])
+        self.assertEqual(task.get_redirect_url(DEFAULT_DATE, 'foo-bar'), None)
+
+    def test_extra_links(self):
+        """
+        test if a operator can support extra_links or not
+        """
+        class DummyTestOperator(BaseOperator):
+            extra_links = ['foo-bar']
+
+            def get_redirect_url(self, ddtm, redirect_to):
+                return('www.foo-bar.com')
+
+        task = DummyTestOperator(task_id="some_dummy_task")
+        self.assertEqual(task.extra_links, ['foo-bar'])
+        self.assertEqual(task.get_redirect_url(DEFAULT_DATE, 'foo-bar'),
+                         'www.foo-bar.com')
+
 
 class DagStatTest(unittest.TestCase):
     def test_dagstats_crud(self):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services