You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/07 02:18:35 UTC
[airflow] branch master updated: Fix Azure Data Explorer Operator
(#13520)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new b2cb6ee Fix Azure Data Explorer Operator (#13520)
b2cb6ee is described below
commit b2cb6ee5ba895983e4e9d9327ff62a9262b765a2
Author: Kenten Danas <37...@users.noreply.github.com>
AuthorDate: Wed Jan 6 18:18:24 2021 -0800
Fix Azure Data Explorer Operator (#13520)
Co-authored-by: Kaxil Naik <ka...@gmail.com>
---
airflow/providers/microsoft/azure/operators/adx.py | 10 +++++--
.../microsoft/azure/operators/test_adx.py | 33 +++++++++++++---------
2 files changed, 26 insertions(+), 17 deletions(-)
diff --git a/airflow/providers/microsoft/azure/operators/adx.py b/airflow/providers/microsoft/azure/operators/adx.py
index e5a8c46..f5a92a6 100644
--- a/airflow/providers/microsoft/azure/operators/adx.py
+++ b/airflow/providers/microsoft/azure/operators/adx.py
@@ -18,10 +18,11 @@
#
"""This module contains Azure Data Explorer operators"""
-from typing import Optional
+from typing import Optional, Union
from azure.kusto.data._models import KustoResultTable
+from airflow.configuration import conf
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.adx import AzureDataExplorerHook
from airflow.utils.decorators import apply_defaults
@@ -66,7 +67,7 @@ class AzureDataExplorerQueryOperator(BaseOperator):
"""Returns new instance of AzureDataExplorerHook"""
return AzureDataExplorerHook(self.azure_data_explorer_conn_id)
- def execute(self, context: dict) -> KustoResultTable:
+ def execute(self, context: dict) -> Union[KustoResultTable, str]:
"""
Run KQL Query on Azure Data Explorer (Kusto).
Returns `PrimaryResult` of Query v2 HTTP response contents
@@ -74,4 +75,7 @@ class AzureDataExplorerQueryOperator(BaseOperator):
"""
hook = self.get_hook()
response = hook.run_query(self.query, self.database, self.options)
- return response.primary_results[0]
+ if conf.getboolean('core', 'enable_xcom_pickling'):
+ return response.primary_results[0]
+ else:
+ return str(response.primary_results[0])
diff --git a/tests/providers/microsoft/azure/operators/test_adx.py b/tests/providers/microsoft/azure/operators/test_adx.py
index 751980d..a425755 100644
--- a/tests/providers/microsoft/azure/operators/test_adx.py
+++ b/tests/providers/microsoft/azure/operators/test_adx.py
@@ -20,6 +20,8 @@
import unittest
from unittest import mock
+from azure.kusto.data._models import KustoResultTable
+
from airflow.models import DAG, TaskInstance
from airflow.providers.microsoft.azure.hooks.adx import AzureDataExplorerHook
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
@@ -36,19 +38,22 @@ MOCK_DATA = {
'options': {'option1': 'option_value'},
}
-MOCK_RESULT = {
- 'name': 'getschema',
- 'kind': 'PrimaryResult',
- 'data': [
- {'ColumnName': 'Source', 'ColumnOrdinal': 0, 'DataType': 'System.String', 'ColumnType': 'string'},
- {
- 'ColumnName': 'Timestamp',
- 'ColumnOrdinal': 1,
- 'DataType': 'System.DateTime',
- 'ColumnType': 'datetime',
- },
- ],
-}
+MOCK_RESULT = KustoResultTable(
+ json_table={
+ 'TableName': 'getschema',
+ "TableId": 1,
+ 'TableKind': 'PrimaryResult',
+ 'Columns': [
+ {"ColumnName": "Source", "ColumnType": "string", 'DataType': 'System.String'},
+ {
+ "ColumnName": "Timestamp",
+ "ColumnType": "datetime",
+ 'DataType': 'System.DateTime',
+ },
+ ],
+ "Rows": [["hi", "2017-01-01T01:01:01.0000003Z"], ["hello", "2017-01-01T01:01:01.0000003Z"]],
+ }
+)
class MockResponse:
@@ -82,4 +87,4 @@ class TestAzureDataExplorerQueryOperator(unittest.TestCase):
ti = TaskInstance(task=self.operator, execution_date=timezone.utcnow())
ti.run()
- self.assertEqual(ti.xcom_pull(task_ids=MOCK_DATA['task_id']), MOCK_RESULT)
+ self.assertEqual(ti.xcom_pull(task_ids=MOCK_DATA['task_id']), str(MOCK_RESULT))