You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2017/11/16 08:03:40 UTC

incubator-airflow git commit: [AIRFLOW-1811] Fix render Druid operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3c8f7747b -> 54c03f326


[AIRFLOW-1811] Fix render Druid operator

Set the correct fields to enable the visualisation
of the rendering
of the Druid indexing spec. Add some tests to make
sure that the
templating is working :-)

Closes #2783 from Fokko/AIRFLOW-1811-fix-druid-
operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/54c03f32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/54c03f32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/54c03f32

Branch: refs/heads/master
Commit: 54c03f3262babb0b5cbe335d429379ebb1440185
Parents: 3c8f774
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Thu Nov 16 09:03:33 2017 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Thu Nov 16 09:03:33 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/druid_operator.py    | 23 ++++++----
 tests/contrib/operators/test_druid_operator.py | 49 ++++++++++++++++++++-
 2 files changed, 62 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/54c03f32/airflow/contrib/operators/druid_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py
index 9eeb614..6978cc3 100644
--- a/airflow/contrib/operators/druid_operator.py
+++ b/airflow/contrib/operators/druid_operator.py
@@ -27,23 +27,28 @@ class DruidOperator(BaseOperator):
     :param druid_ingest_conn_id: The connection id of the Druid overlord which accepts index jobs
     :type druid_ingest_conn_id: str
     """
-    template_fields = ('intervals',)
+    template_fields = ('index_spec_str',)
     template_ext = ('.json',)
 
     def __init__(
-            self,
-            json_index_file,
-            druid_ingest_conn_id='druid_ingest_default',
-            intervals=None,
-            *args, **kwargs):
+        self,
+        json_index_file,
+        druid_ingest_conn_id='druid_ingest_default',
+        *args, **kwargs):
 
         super(DruidOperator, self).__init__(*args, **kwargs)
         self.conn_id = druid_ingest_conn_id
-        self.intervals=intervals
 
         with open(json_index_file) as data_file:
-            self.index_spec = json.load(data_file)
+            index_spec = json.load(data_file)
+        self.index_spec_str = json.dumps(
+            index_spec,
+            sort_keys=True,
+            indent=4,
+            separators=(',', ': ')
+        )
 
     def execute(self, context):
         hook = DruidHook(druid_ingest_conn_id=self.conn_id)
-        hook.submit_indexing_job(json.dumps(self.index_spec))
+        self.log.info("Sumitting %s", self.index_spec_str)
+        hook.submit_indexing_job(self.index_spec_str)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/54c03f32/tests/contrib/operators/test_druid_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_druid_operator.py b/tests/contrib/operators/test_druid_operator.py
index aa5cb61..9df6c48 100644
--- a/tests/contrib/operators/test_druid_operator.py
+++ b/tests/contrib/operators/test_druid_operator.py
@@ -19,6 +19,9 @@ import unittest
 
 from airflow import DAG, configuration
 from airflow.contrib.operators.druid_operator import DruidOperator
+from airflow.models import TaskInstance
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
 
 
 class TestDruidOperator(unittest.TestCase):
@@ -40,7 +43,51 @@ class TestDruidOperator(unittest.TestCase):
             )
 
             m.assert_called_once_with('index_spec.json')
-            self.assertEqual(druid.index_spec, {'some': 'json'})
+            self.assertEqual(druid.index_spec_str, '{\n    "some": "json"\n}')
+
+    def test_render_template(self):
+        json_str = '''
+            {
+                "type": "{{ params.index_type }}",
+                "datasource": "{{ params.datasource }}",
+                "spec": {
+                    "dataSchema": {
+                        "granularitySpec": {
+                            "intervals": ["{{ ds }}/{{ macros.ds_add(ds, 1) }}"]
+                        }
+                    }
+                }
+            }
+        '''
+        m = mock.mock_open(read_data=json_str)
+        with mock.patch('airflow.contrib.operators.druid_operator.open', m, create=True) as m:
+            operator = DruidOperator(
+                task_id='spark_submit_job',
+                json_index_file='index_spec.json',
+                params={
+                    'index_type': 'index_hadoop',
+                    'datasource': 'datasource_prd'
+                },
+                dag=self.dag
+            )
+            ti = TaskInstance(operator, DEFAULT_DATE)
+            ti.render_templates()
+
+            m.assert_called_once_with('index_spec.json')
+            expected = '''{
+    "datasource": "datasource_prd",
+    "spec": {
+        "dataSchema": {
+            "granularitySpec": {
+                "intervals": [
+                    "2017-01-01/2017-01-02"
+                ]
+            }
+        }
+    },
+    "type": "index_hadoop"
+}'''
+            self.assertEqual(expected, getattr(operator, 'index_spec_str'))
 
 
 if __name__ == '__main__':