You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/07 02:18:35 UTC

[airflow] branch master updated: Fix Azure Data Explorer Operator (#13520)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new b2cb6ee  Fix Azure Data Explorer Operator (#13520)
b2cb6ee is described below

commit b2cb6ee5ba895983e4e9d9327ff62a9262b765a2
Author: Kenten Danas <37...@users.noreply.github.com>
AuthorDate: Wed Jan 6 18:18:24 2021 -0800

    Fix Azure Data Explorer Operator (#13520)
    
    Co-authored-by: Kaxil Naik <ka...@gmail.com>
---
 airflow/providers/microsoft/azure/operators/adx.py | 10 +++++--
 .../microsoft/azure/operators/test_adx.py          | 33 +++++++++++++---------
 2 files changed, 26 insertions(+), 17 deletions(-)

diff --git a/airflow/providers/microsoft/azure/operators/adx.py b/airflow/providers/microsoft/azure/operators/adx.py
index e5a8c46..f5a92a6 100644
--- a/airflow/providers/microsoft/azure/operators/adx.py
+++ b/airflow/providers/microsoft/azure/operators/adx.py
@@ -18,10 +18,11 @@
 #
 
 """This module contains Azure Data Explorer operators"""
-from typing import Optional
+from typing import Optional, Union
 
 from azure.kusto.data._models import KustoResultTable
 
+from airflow.configuration import conf
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.azure.hooks.adx import AzureDataExplorerHook
 from airflow.utils.decorators import apply_defaults
@@ -66,7 +67,7 @@ class AzureDataExplorerQueryOperator(BaseOperator):
         """Returns new instance of AzureDataExplorerHook"""
         return AzureDataExplorerHook(self.azure_data_explorer_conn_id)
 
-    def execute(self, context: dict) -> KustoResultTable:
+    def execute(self, context: dict) -> Union[KustoResultTable, str]:
         """
         Run KQL Query on Azure Data Explorer (Kusto).
         Returns `PrimaryResult` of Query v2 HTTP response contents
@@ -74,4 +75,7 @@ class AzureDataExplorerQueryOperator(BaseOperator):
         """
         hook = self.get_hook()
         response = hook.run_query(self.query, self.database, self.options)
-        return response.primary_results[0]
+        if conf.getboolean('core', 'enable_xcom_pickling'):
+            return response.primary_results[0]
+        else:
+            return str(response.primary_results[0])
diff --git a/tests/providers/microsoft/azure/operators/test_adx.py b/tests/providers/microsoft/azure/operators/test_adx.py
index 751980d..a425755 100644
--- a/tests/providers/microsoft/azure/operators/test_adx.py
+++ b/tests/providers/microsoft/azure/operators/test_adx.py
@@ -20,6 +20,8 @@
 import unittest
 from unittest import mock
 
+from azure.kusto.data._models import KustoResultTable
+
 from airflow.models import DAG, TaskInstance
 from airflow.providers.microsoft.azure.hooks.adx import AzureDataExplorerHook
 from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
@@ -36,19 +38,22 @@ MOCK_DATA = {
     'options': {'option1': 'option_value'},
 }
 
-MOCK_RESULT = {
-    'name': 'getschema',
-    'kind': 'PrimaryResult',
-    'data': [
-        {'ColumnName': 'Source', 'ColumnOrdinal': 0, 'DataType': 'System.String', 'ColumnType': 'string'},
-        {
-            'ColumnName': 'Timestamp',
-            'ColumnOrdinal': 1,
-            'DataType': 'System.DateTime',
-            'ColumnType': 'datetime',
-        },
-    ],
-}
+MOCK_RESULT = KustoResultTable(
+    json_table={
+        'TableName': 'getschema',
+        "TableId": 1,
+        'TableKind': 'PrimaryResult',
+        'Columns': [
+            {"ColumnName": "Source", "ColumnType": "string", 'DataType': 'System.String'},
+            {
+                "ColumnName": "Timestamp",
+                "ColumnType": "datetime",
+                'DataType': 'System.DateTime',
+            },
+        ],
+        "Rows": [["hi", "2017-01-01T01:01:01.0000003Z"], ["hello", "2017-01-01T01:01:01.0000003Z"]],
+    }
+)
 
 
 class MockResponse:
@@ -82,4 +87,4 @@ class TestAzureDataExplorerQueryOperator(unittest.TestCase):
         ti = TaskInstance(task=self.operator, execution_date=timezone.utcnow())
         ti.run()
 
-        self.assertEqual(ti.xcom_pull(task_ids=MOCK_DATA['task_id']), MOCK_RESULT)
+        self.assertEqual(ti.xcom_pull(task_ids=MOCK_DATA['task_id']), str(MOCK_RESULT))