You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/09 22:52:05 UTC
[airflow] 07/39: AIRFLOW-5529 Add Apache Drill provider. (#16884)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4aef457774c646940caef7a02e0c2c0d76915160
Author: dzamo <91...@users.noreply.github.com>
AuthorDate: Mon Jul 12 19:59:35 2021 +0200
AIRFLOW-5529 Add Apache Drill provider. (#16884)
(cherry picked from commit 8808b641942e1b81c21db054fd6d36e2031cfab8)
---
CONTRIBUTING.rst | 22 ++---
INSTALL | 24 +++---
airflow/providers/apache/drill/CHANGELOG.rst | 25 ++++++
airflow/providers/apache/drill/__init__.py | 17 ++++
.../apache/drill/example_dags/example_drill_dag.py | 46 +++++++++++
airflow/providers/apache/drill/hooks/__init__.py | 17 ++++
airflow/providers/apache/drill/hooks/drill.py | 89 +++++++++++++++++++++
.../providers/apache/drill/operators/__init__.py | 17 ++++
airflow/providers/apache/drill/operators/drill.py | 71 ++++++++++++++++
airflow/providers/apache/drill/provider.yaml | 49 ++++++++++++
airflow/ui/src/views/Docs.tsx | 1 +
airflow/utils/db.py | 10 +++
.../commits.rst | 23 ++++++
.../connections/drill.rst | 44 ++++++++++
.../index.rst | 50 ++++++++++++
.../operators.rst | 51 ++++++++++++
docs/apache-airflow/extra-packages-ref.rst | 2 +
docs/conf.py | 1 +
docs/integration-logos/apache/drill.png | Bin 0 -> 40173 bytes
docs/spelling_wordlist.txt | 1 +
setup.py | 3 +
tests/providers/apache/drill/__init__.py | 17 ++++
tests/providers/apache/drill/hooks/__init__.py | 17 ++++
tests/providers/apache/drill/hooks/test_drill.py | 84 +++++++++++++++++++
tests/providers/apache/drill/operators/__init__.py | 17 ++++
.../providers/apache/drill/operators/test_drill.py | 63 +++++++++++++++
26 files changed, 738 insertions(+), 23 deletions(-)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 98cdf93..be807f4 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -573,17 +573,17 @@ This is the full list of those extras:
.. START EXTRAS HERE
-airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
-apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
-apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
-cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, devel_all, devel_ci,
-devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp,
-gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc,
-jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql,
-microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill,
-password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba,
-segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau,
-telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
+airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.drill,
+apache.druid, apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot,
+apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery,
+cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel,
+devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol,
+facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive,
+http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
+microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle,
+pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3,
+salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite,
+ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
.. END EXTRAS HERE
diff --git a/INSTALL b/INSTALL
index 111b51f..554af5c 100644
--- a/INSTALL
+++ b/INSTALL
@@ -1,6 +1,6 @@
# INSTALL / BUILD instructions for Apache Airflow
-This ia a generic installation method that requires a number of dependencies to be installed.
+This is a generic installation method that requires a number of dependencies to be installed.
Depending on your system you might need different prerequisites, but the following
systems/prerequisites are known to work:
@@ -89,17 +89,17 @@ The list of available extras:
# START EXTRAS HERE
-airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
-apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
-apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
-cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, devel_all, devel_ci,
-devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp,
-gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc,
-jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql,
-microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill,
-password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba,
-segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau,
-telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
+airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.drill,
+apache.druid, apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot,
+apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery,
+cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel,
+devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol,
+facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive,
+http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
+microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle,
+pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3,
+salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite,
+ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
# END EXTRAS HERE
diff --git a/airflow/providers/apache/drill/CHANGELOG.rst b/airflow/providers/apache/drill/CHANGELOG.rst
new file mode 100644
index 0000000..cef7dda
--- /dev/null
+++ b/airflow/providers/apache/drill/CHANGELOG.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/airflow/providers/apache/drill/__init__.py b/airflow/providers/apache/drill/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/apache/drill/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/drill/example_dags/example_drill_dag.py b/airflow/providers/apache/drill/example_dags/example_drill_dag.py
new file mode 100644
index 0000000..60a35ee
--- /dev/null
+++ b/airflow/providers/apache/drill/example_dags/example_drill_dag.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG to submit Apache Spark applications using
+`SparkSubmitOperator`, `SparkJDBCOperator` and `SparkSqlOperator`.
+"""
+from airflow.models import DAG
+from airflow.providers.apache.drill.operators.drill import DrillOperator
+from airflow.utils.dates import days_ago
+
+args = {
+ 'owner': 'Airflow',
+}
+
+with DAG(
+ dag_id='example_drill_dag',
+ default_args=args,
+ schedule_interval=None,
+ start_date=days_ago(2),
+ tags=['example'],
+) as dag:
+ # [START howto_operator_drill]
+ sql_task = DrillOperator(
+ task_id='json_to_parquet_table',
+ sql='''
+ drop table if exists dfs.tmp.employee;
+ create table dfs.tmp.employee as select * from cp.`employee.json`;
+ ''',
+ )
+ # [END howto_operator_drill]
diff --git a/airflow/providers/apache/drill/hooks/__init__.py b/airflow/providers/apache/drill/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/apache/drill/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/drill/hooks/drill.py b/airflow/providers/apache/drill/hooks/drill.py
new file mode 100644
index 0000000..470be8c
--- /dev/null
+++ b/airflow/providers/apache/drill/hooks/drill.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Iterable, Optional, Tuple
+
+from sqlalchemy import create_engine
+from sqlalchemy.engine import Connection
+
+from airflow.hooks.dbapi import DbApiHook
+
+
+class DrillHook(DbApiHook):
+ """
+ Interact with Apache Drill via sqlalchemy-drill.
+
+ You can specify the SQLAlchemy dialect and driver that sqlalchemy-drill
+ will employ to communicate with Drill in the extras field of your
+ connection, e.g. ``{"dialect_driver": "drill+sadrill"}`` for communication
+ over Drill's REST API. See the sqlalchemy-drill documentation for
+ descriptions of the supported dialects and drivers.
+
+ You can specify the default storage_plugin for the sqlalchemy-drill
+ connection using the extras field e.g. ``{"storage_plugin": "dfs"}``.
+ """
+
+ conn_name_attr = 'drill_conn_id'
+ default_conn_name = 'drill_default'
+ conn_type = 'drill'
+ hook_name = 'Drill'
+ supports_autocommit = False
+
+ def get_conn(self) -> Connection:
+ """Establish a connection to Drillbit."""
+ conn_md = self.get_connection(getattr(self, self.conn_name_attr))
+ creds = f'{conn_md.login}:{conn_md.password}@' if conn_md.login else ''
+ engine = create_engine(
+ f'{conn_md.extra_dejson.get("dialect_driver", "drill+sadrill")}://{creds}'
+ f'{conn_md.host}:{conn_md.port}/'
+ f'{conn_md.extra_dejson.get("storage_plugin", "dfs")}'
+ )
+
+ self.log.info(
+ 'Connected to the Drillbit at %s:%s as user %s', conn_md.host, conn_md.port, conn_md.login
+ )
+ return engine.raw_connection()
+
+ def get_uri(self) -> str:
+ """
+ Returns the connection URI
+
+ e.g: ``drill://localhost:8047/dfs``
+ """
+ conn_md = self.get_connection(getattr(self, self.conn_name_attr))
+ host = conn_md.host
+ if conn_md.port is not None:
+ host += f':{conn_md.port}'
+ conn_type = 'drill' if not conn_md.conn_type else conn_md.conn_type
+ dialect_driver = conn_md.extra_dejson.get('dialect_driver', 'drill+sadrill')
+ storage_plugin = conn_md.extra_dejson.get('storage_plugin', 'dfs')
+ return f'{conn_type}://{host}/{storage_plugin}' f'?dialect_driver={dialect_driver}'
+
+ def set_autocommit(self, conn: Connection, autocommit: bool) -> NotImplemented:
+ raise NotImplementedError("There are no transactions in Drill.")
+
+ def insert_rows(
+ self,
+ table: str,
+ rows: Iterable[Tuple[str]],
+ target_fields: Optional[Iterable[str]] = None,
+ commit_every: int = 1000,
+ replace: bool = False,
+ **kwargs: Any,
+ ) -> NotImplemented:
+ raise NotImplementedError("There is no INSERT statement in Drill.")
diff --git a/airflow/providers/apache/drill/operators/__init__.py b/airflow/providers/apache/drill/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/apache/drill/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/drill/operators/drill.py b/airflow/providers/apache/drill/operators/drill.py
new file mode 100644
index 0000000..459c623
--- /dev/null
+++ b/airflow/providers/apache/drill/operators/drill.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Iterable, Mapping, Optional, Union
+
+import sqlparse
+
+from airflow.models import BaseOperator
+from airflow.providers.apache.drill.hooks.drill import DrillHook
+from airflow.utils.decorators import apply_defaults
+
+
+class DrillOperator(BaseOperator):
+ """
+ Executes the provided SQL in the identified Drill environment.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:DrillOperator`
+
+ :param sql: the SQL code to be executed. (templated)
+ :type sql: Can receive a str representing a sql statement,
+ a list of str (sql statements), or a reference to a template file.
+ Template references are recognized by str ending in '.sql'
+ :param drill_conn_id: id of the connection config for the target Drill
+ environment
+ :type drill_conn_id: str
+ :param parameters: (optional) the parameters to render the SQL query with.
+ :type parameters: dict or iterable
+ """
+
+ template_fields = ('sql',)
+ template_fields_renderers = {'sql': 'sql'}
+ template_ext = ('.sql',)
+ ui_color = '#ededed'
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ sql: str,
+ drill_conn_id: str = 'drill_default',
+ parameters: Optional[Union[Mapping, Iterable]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.sql = sql
+ self.drill_conn_id = drill_conn_id
+ self.parameters = parameters
+ self.hook = None
+
+ def execute(self, context):
+ self.log.info('Executing: %s on %s', self.sql, self.drill_conn_id)
+ self.hook = DrillHook(drill_conn_id=self.drill_conn_id)
+ sql = sqlparse.split(sqlparse.format(self.sql, strip_comments=True))
+ no_term_sql = [s[:-1] for s in sql if s[-1] == ';']
+ self.hook.run(no_term_sql, parameters=self.parameters)
diff --git a/airflow/providers/apache/drill/provider.yaml b/airflow/providers/apache/drill/provider.yaml
new file mode 100644
index 0000000..88021e6
--- /dev/null
+++ b/airflow/providers/apache/drill/provider.yaml
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-apache-drill
+name: Apache Drill
+description: |
+ `Apache Drill <https://drill.apache.org/>`__.
+
+versions:
+ - 1.0.0
+
+additional-dependencies:
+ - apache-airflow>=2.1.0
+
+integrations:
+ - integration-name: Apache Drill
+ external-doc-url: https://drill.apache.org/
+ how-to-guide:
+ - /docs/apache-airflow-providers-apache-drill/operators.rst
+ logo: /integration-logos/apache/drill.png
+ tags: [apache]
+
+operators:
+ - integration-name: Apache Drill
+ python-modules:
+ - airflow.providers.apache.drill.operators.drill
+
+hooks:
+ - integration-name: Apache Drill
+ python-modules:
+ - airflow.providers.apache.drill.hooks.drill
+
+hook-class-names:
+ - airflow.providers.apache.drill.hooks.drill.DrillHook
diff --git a/airflow/ui/src/views/Docs.tsx b/airflow/ui/src/views/Docs.tsx
index 754b803..ea42ca6 100644
--- a/airflow/ui/src/views/Docs.tsx
+++ b/airflow/ui/src/views/Docs.tsx
@@ -41,6 +41,7 @@ const Docs: React.FC = () => {
{ path: 'amazon', name: 'Amazon' },
{ path: 'apache-beam', name: 'Apache Beam' },
{ path: 'apache-cassandra', name: 'Apache Cassandra' },
+ { path: 'apache-drill', name: 'Apache Drill' },
{ path: 'apache-druid', name: 'Apache Druid' },
{ path: 'apache-hdfs', name: 'Apache HDFS' },
{ path: 'apache-hive', name: 'Apache Hive' },
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index ae8dc0e..69e2e00 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -168,6 +168,16 @@ def create_default_connections(session=None):
)
merge_conn(
Connection(
+ conn_id="drill_default",
+ conn_type="drill",
+ host="localhost",
+ port=8047,
+ extra='{"dialect_driver": "drill+sadrill", "storage_plugin": "dfs"}',
+ ),
+ session,
+ )
+ merge_conn(
+ Connection(
conn_id="druid_broker_default",
conn_type="druid",
host="druid-broker",
diff --git a/docs/apache-airflow-providers-apache-drill/commits.rst b/docs/apache-airflow-providers-apache-drill/commits.rst
new file mode 100644
index 0000000..deb31b3
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-drill/commits.rst
@@ -0,0 +1,23 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Package apache-airflow-providers-apache-drill
+------------------------------------------------------
+
+`Apache Drill <https://drill.apache.org/>`__.
diff --git a/docs/apache-airflow-providers-apache-drill/connections/drill.rst b/docs/apache-airflow-providers-apache-drill/connections/drill.rst
new file mode 100644
index 0000000..05e00a2
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-drill/connections/drill.rst
@@ -0,0 +1,44 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+.. _howto/connection:drill:
+
+Apache Drill Connection
+=======================
+
+The Apache Drill connection type configures a connection to Apache Drill via the sqlalchemy-drill Python package.
+
+Default Connection IDs
+----------------------
+
+Drill hooks and operators use ``drill_default`` by default.
+
+Configuring the Connection
+--------------------------
+Host (required)
+ The host of the Drillbit to connect to (HTTP, JDBC) or the DSN of the Drill ODBC connection.
+
+Port (optional)
+ The port of the Drillbit to connect to.
+
+Extra (optional)
+ A JSON dictionary specifying the extra parameters that can be used in sqlalchemy-drill connection.
+
+ * ``dialect_driver`` - The dialect and driver as understood by sqlalchemy-drill. Defaults to ``drill_sadrill`` (HTTP).
+ * ``storage_plugin`` - The default Drill storage plugin for this connection. Defaults to ``dfs``.
diff --git a/docs/apache-airflow-providers-apache-drill/index.rst b/docs/apache-airflow-providers-apache-drill/index.rst
new file mode 100644
index 0000000..9ef0f2e
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-drill/index.rst
@@ -0,0 +1,50 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+``apache-airflow-providers-apache-drill``
+=========================================
+
+Content
+-------
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Guides
+
+ Connection types <connections/drill>
+ Operators <operators>
+
+.. toctree::
+ :maxdepth: 1
+ :caption: References
+
+ Python API <_api/airflow/providers/apache/drill/index>
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Resources
+
+ Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/apache/drill/example_dags>
+ PyPI Repository <https://pypi.org/project/apache-airflow-providers-apache-drill/>
+
+.. THE REMINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME!
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Commits
+
+ Detailed list of commits <commits>
diff --git a/docs/apache-airflow-providers-apache-drill/operators.rst b/docs/apache-airflow-providers-apache-drill/operators.rst
new file mode 100644
index 0000000..f2d5cf5
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-drill/operators.rst
@@ -0,0 +1,51 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Apache Drill Operators
+======================
+
+.. contents::
+ :depth: 1
+ :local:
+
+Prerequisite
+------------
+
+To use ``DrillOperator``, you must configure a :doc:`Drill Connection <connections/drill>`.
+
+
+.. _howto/operator:DrillOperator:
+
+DrillOperator
+-------------
+
+Executes one or more SQL queries on an Apache Drill server. The ``sql`` parameter can be templated and be an external ``.sql`` file.
+
+Using the operator
+""""""""""""""""""
+
+.. exampleinclude:: /../../airflow/providers/apache/drill/example_dags/example_drill_dag.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_drill]
+ :end-before: [END howto_operator_drill]
+
+Reference
+"""""""""
+
+For further information, see `the Drill documentation on querying data <http://apache.github.io/drill/docs/query-data/>`_.
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index b1dff07..2d4769e 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -116,6 +116,8 @@ custom bash/python providers).
+---------------------+-----------------------------------------------------+------------------------------------------------+
| apache.cassandra | ``pip install 'apache-airflow[apache.cassandra]'`` | Cassandra related operators & hooks |
+---------------------+-----------------------------------------------------+------------------------------------------------+
+| apache.drill | ``pip install 'apache-airflow[apache.drill]'`` | Drill related operators & hooks |
++---------------------+-----------------------------------------------------+------------------------------------------------+
| apache.druid | ``pip install 'apache-airflow[apache.druid]'`` | Druid related operators & hooks |
+---------------------+-----------------------------------------------------+------------------------------------------------+
| apache.hdfs | ``pip install 'apache-airflow[apache.hdfs]'`` | HDFS hooks and operators |
diff --git a/docs/conf.py b/docs/conf.py
index 3046303..da3b8e2 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -512,6 +512,7 @@ autodoc_mock_imports = [
'slack_sdk',
'smbclient',
'snowflake',
+ 'sqlalchemy-drill',
'sshtunnel',
'telegram',
'tenacity',
diff --git a/docs/integration-logos/apache/drill.png b/docs/integration-logos/apache/drill.png
new file mode 100644
index 0000000..9f76b61
Binary files /dev/null and b/docs/integration-logos/apache/drill.png differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index c476512..7631c6c 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -120,6 +120,7 @@ Docstring
Docstrings
Dont
Driesprong
+Drillbit
Drivy
Dsn
Dynamodb
diff --git a/setup.py b/setup.py
index 9e8c28d..9dde824 100644
--- a/setup.py
+++ b/setup.py
@@ -255,6 +255,7 @@ doc = [
docker = [
'docker',
]
+drill = ['sqlalchemy-drill>=1.1.0', 'sqlparse>=0.4.1']
druid = [
'pydruid>=0.4.1',
]
@@ -534,6 +535,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = {
'amazon': amazon,
'apache.beam': apache_beam,
'apache.cassandra': cassandra,
+ 'apache.drill': drill,
'apache.druid': druid,
'apache.hdfs': hdfs,
'apache.hive': hive,
@@ -724,6 +726,7 @@ ALL_PROVIDERS = list(PROVIDERS_REQUIREMENTS.keys())
ALL_DB_PROVIDERS = [
'apache.cassandra',
+ 'apache.drill',
'apache.druid',
'apache.hdfs',
'apache.hive',
diff --git a/tests/providers/apache/drill/__init__.py b/tests/providers/apache/drill/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/drill/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/drill/hooks/__init__.py b/tests/providers/apache/drill/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/drill/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/drill/hooks/test_drill.py b/tests/providers/apache/drill/hooks/test_drill.py
new file mode 100644
index 0000000..97ed71f
--- /dev/null
+++ b/tests/providers/apache/drill/hooks/test_drill.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from unittest.mock import MagicMock
+
+from airflow.providers.apache.drill.hooks.drill import DrillHook
+
+
+class TestDrillHook(unittest.TestCase):
+ def setUp(self):
+ self.cur = MagicMock(rowcount=0)
+ self.conn = conn = MagicMock()
+ self.conn.login = 'drill_user'
+ self.conn.password = 'secret'
+ self.conn.host = 'host'
+ self.conn.port = '8047'
+ self.conn.conn_type = 'drill'
+ self.conn.extra_dejson = {'dialect_driver': 'drill+sadrill', 'storage_plugin': 'dfs'}
+ self.conn.cursor.return_value = self.cur
+
+ class TestDrillHook(DrillHook):
+ def get_conn(self):
+ return conn
+
+ def get_connection(self, conn_id):
+ return conn
+
+ self.db_hook = TestDrillHook
+
+ def test_get_uri(self):
+ db_hook = self.db_hook()
+ assert 'drill://host:8047/dfs?dialect_driver=drill+sadrill' == db_hook.get_uri()
+
+ def test_get_first_record(self):
+ statement = 'SQL'
+ result_sets = [('row1',), ('row2',)]
+ self.cur.fetchone.return_value = result_sets[0]
+
+ assert result_sets[0] == self.db_hook().get_first(statement)
+ assert self.conn.close.call_count == 1
+ assert self.cur.close.call_count == 1
+ self.cur.execute.assert_called_once_with(statement)
+
+ def test_get_records(self):
+ statement = 'SQL'
+ result_sets = [('row1',), ('row2',)]
+ self.cur.fetchall.return_value = result_sets
+
+ assert result_sets == self.db_hook().get_records(statement)
+ assert self.conn.close.call_count == 1
+ assert self.cur.close.call_count == 1
+ self.cur.execute.assert_called_once_with(statement)
+
+ def test_get_pandas_df(self):
+ statement = 'SQL'
+ column = 'col'
+ result_sets = [('row1',), ('row2',)]
+ self.cur.description = [(column,)]
+ self.cur.fetchall.return_value = result_sets
+ df = self.db_hook().get_pandas_df(statement)
+
+ assert column == df.columns[0]
+ for i in range(len(result_sets)): # pylint: disable=consider-using-enumerate
+ assert result_sets[i][0] == df.values.tolist()[i][0]
+ assert self.conn.close.call_count == 1
+ assert self.cur.close.call_count == 1
+ self.cur.execute.assert_called_once_with(statement)
diff --git a/tests/providers/apache/drill/operators/__init__.py b/tests/providers/apache/drill/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/drill/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/drill/operators/test_drill.py b/tests/providers/apache/drill/operators/test_drill.py
new file mode 100644
index 0000000..3572d85
--- /dev/null
+++ b/tests/providers/apache/drill/operators/test_drill.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.providers.apache.drill.operators.drill import DrillOperator
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
+DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
+DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
+TEST_DAG_ID = 'unit_test_dag'
+
+
+@pytest.mark.backend("drill")
+class TestDrillOperator(unittest.TestCase):
+ def setUp(self):
+ args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+ dag = DAG(TEST_DAG_ID, default_args=args)
+ self.dag = dag
+
+ def tearDown(self):
+ tables_to_drop = ['dfs.tmp.test_airflow']
+ from airflow.providers.apache.drill.hooks.drill import DrillHook
+
+ with DrillHook().get_conn() as conn:
+ with conn.cursor() as cur:
+ for table in tables_to_drop:
+ cur.execute(f"DROP TABLE IF EXISTS {table}")
+
+ def test_drill_operator_single(self):
+ sql = """
+ create table dfs.tmp.test_airflow as
+ select * from cp.`employee.json` limit 10
+ """
+ op = DrillOperator(task_id='drill_operator_test_single', sql=sql, dag=self.dag)
+ op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ def test_drill_operator_multi(self):
+ sql = [
+ "create table dfs.tmp.test_airflow as" "select * from cp.`employee.json` limit 10",
+ "select sum(employee_id), any_value(full_name)" "from dfs.tmp.test_airflow",
+ ]
+ op = DrillOperator(task_id='drill_operator_test_multi', sql=sql, dag=self.dag)
+ op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)