You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/14 16:28:06 UTC
[airflow] branch master updated: Add Neo4j hook and operator
(#13324)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 1d2977f Add Neo4j hook and operator (#13324)
1d2977f is described below
commit 1d2977f6a4c67fa6174c79dcdc4e9ee3ce06f1b1
Author: Kanthi <su...@gmail.com>
AuthorDate: Thu Jan 14 11:27:50 2021 -0500
Add Neo4j hook and operator (#13324)
Close: #12873
---
CONTRIBUTING.rst | 2 +-
INSTALL | 2 +-
airflow/providers/neo4j/README.md | 18 ++++
airflow/providers/neo4j/__init__.py | 17 +++
airflow/providers/neo4j/example_dags/__init__.py | 17 +++
.../providers/neo4j/example_dags/example_neo4j.py | 48 +++++++++
airflow/providers/neo4j/hooks/__init__.py | 17 +++
airflow/providers/neo4j/hooks/neo4j.py | 117 +++++++++++++++++++++
airflow/providers/neo4j/operators/__init__.py | 17 +++
airflow/providers/neo4j/operators/neo4j.py | 62 +++++++++++
airflow/providers/neo4j/provider.yaml | 44 ++++++++
.../connections/neo4j.rst | 63 +++++++++++
docs/apache-airflow-providers-neo4j/index.rst | 48 +++++++++
.../operators/neo4j.rst | 50 +++++++++
docs/apache-airflow/extra-packages-ref.rst | 2 +
docs/spelling_wordlist.txt | 4 +
.../run_install_and_test_provider_packages.sh | 4 +-
setup.py | 3 +
tests/core/test_providers_manager.py | 2 +
tests/providers/neo4j/__init__.py | 17 +++
tests/providers/neo4j/hooks/__init__.py | 17 +++
tests/providers/neo4j/hooks/test_neo4j.py | 65 ++++++++++++
tests/providers/neo4j/operators/__init__.py | 17 +++
tests/providers/neo4j/operators/test_neo4j.py | 61 +++++++++++
24 files changed, 710 insertions(+), 4 deletions(-)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index dc25d3c..b61c7ab 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -696,7 +696,7 @@ apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
-ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas,
+ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm,
diff --git a/INSTALL b/INSTALL
index c2b8fe2..c2a1d04 100644
--- a/INSTALL
+++ b/INSTALL
@@ -97,7 +97,7 @@ apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
-ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas,
+ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm,
diff --git a/airflow/providers/neo4j/README.md b/airflow/providers/neo4j/README.md
new file mode 100644
index 0000000..ef14aff
--- /dev/null
+++ b/airflow/providers/neo4j/README.md
@@ -0,0 +1,18 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
diff --git a/airflow/providers/neo4j/__init__.py b/airflow/providers/neo4j/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/neo4j/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/neo4j/example_dags/__init__.py b/airflow/providers/neo4j/example_dags/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/neo4j/example_dags/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/neo4j/example_dags/example_neo4j.py b/airflow/providers/neo4j/example_dags/example_neo4j.py
new file mode 100644
index 0000000..7d6f2fc
--- /dev/null
+++ b/airflow/providers/neo4j/example_dags/example_neo4j.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example use of Neo4j related operators.
+"""
+
+from airflow import DAG
+from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
+from airflow.utils.dates import days_ago
+
+default_args = {
+ 'owner': 'airflow',
+}
+
+dag = DAG(
+ 'example_neo4j',
+ default_args=default_args,
+ start_date=days_ago(2),
+ tags=['example'],
+)
+
+# [START run_query_neo4j_operator]
+
+neo4j_task = Neo4jOperator(
+ task_id='run_neo4j_query',
+ neo4j_conn_id='neo4j_conn_id',
+ sql='MATCH (tom {name: "Tom Hanks"}) RETURN tom',
+ dag=dag,
+)
+
+# [END run_query_neo4j_operator]
+
+neo4j_task
diff --git a/airflow/providers/neo4j/hooks/__init__.py b/airflow/providers/neo4j/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/neo4j/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/neo4j/hooks/neo4j.py b/airflow/providers/neo4j/hooks/neo4j.py
new file mode 100644
index 0000000..d473b01
--- /dev/null
+++ b/airflow/providers/neo4j/hooks/neo4j.py
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module allows to connect to a Neo4j database."""
+
+from neo4j import GraphDatabase, Neo4jDriver, Result
+
+from airflow.hooks.base import BaseHook
+from airflow.models import Connection
+
+
+class Neo4jHook(BaseHook):
+ """
+ Interact with Neo4j.
+
+ Performs a connection to Neo4j and runs the query.
+ """
+
+ conn_name_attr = 'neo4j_conn_id'
+ default_conn_name = 'neo4j_default'
+ conn_type = 'neo4j'
+ hook_name = 'Neo4j'
+
+ def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self.neo4j_conn_id = conn_id
+ self.connection = kwargs.pop("connection", None)
+ self.client = None
+ self.extras = None
+ self.uri = None
+
+ def get_conn(self) -> Neo4jDriver:
+ """
+ Function that initiates a new Neo4j connection
+ with username, password and database schema.
+ """
+ self.connection = self.get_connection(self.neo4j_conn_id)
+ self.extras = self.connection.extra_dejson.copy()
+
+ self.uri = self.get_uri(self.connection)
+ self.log.info('URI: %s', self.uri)
+
+ if self.client is not None:
+ return self.client
+
+ is_encrypted = self.connection.extra_dejson.get('encrypted', False)
+
+ self.client = GraphDatabase.driver(
+ self.uri, auth=(self.connection.login, self.connection.password), encrypted=is_encrypted
+ )
+
+ return self.client
+
+ def get_uri(self, conn: Connection) -> str:
+ """
+ Build the uri based on extras
+ - Default - uses bolt scheme(bolt://)
+ - neo4j_scheme - neo4j://
+ - certs_self_signed - neo4j+ssc://
+ - certs_trusted_ca - neo4j+s://
+ :param conn: connection object.
+ :return: uri
+ """
+ use_neo4j_scheme = conn.extra_dejson.get('neo4j_scheme', False)
+ scheme = 'neo4j' if use_neo4j_scheme else 'bolt'
+
+ # Self signed certificates
+ ssc = conn.extra_dejson.get('certs_self_signed', False)
+
+ # Only certificates signed by CA.
+ trusted_ca = conn.extra_dejson.get('certs_trusted_ca', False)
+ encryption_scheme = ''
+
+ if ssc:
+ encryption_scheme = '+ssc'
+ elif trusted_ca:
+ encryption_scheme = '+s'
+
+ return '{scheme}{encryption_scheme}://{host}:{port}'.format(
+ scheme=scheme,
+ encryption_scheme=encryption_scheme,
+ host=conn.host,
+ port='7687' if conn.port is None else f'{conn.port}',
+ )
+
+ def run(self, query) -> Result:
+ """
+ Function to create a neo4j session
+ and execute the query in the session.
+
+
+ :param query: Neo4j query
+ :return: Result
+ """
+ driver = self.get_conn()
+ if not self.connection.schema:
+ with driver.session() as session:
+ result = session.run(query)
+ else:
+ with driver.session(database=self.connection.schema) as session:
+ result = session.run(query)
+ return result
diff --git a/airflow/providers/neo4j/operators/__init__.py b/airflow/providers/neo4j/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/neo4j/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/neo4j/operators/neo4j.py b/airflow/providers/neo4j/operators/neo4j.py
new file mode 100644
index 0000000..20df9cb
--- /dev/null
+++ b/airflow/providers/neo4j/operators/neo4j.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Dict, Iterable, Mapping, Optional, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
+from airflow.utils.decorators import apply_defaults
+
+
+class Neo4jOperator(BaseOperator):
+ """
+ Executes sql code in a specific Neo4j database
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:Neo4jOperator`
+
+ :param sql: the sql code to be executed. Can receive a str representing a
+ sql statement, a list of str (sql statements)
+ :type sql: str or list[str]
+ :param neo4j_conn_id: reference to a specific Neo4j database
+ :type neo4j_conn_id: str
+ """
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ sql: str,
+ neo4j_conn_id: str = 'neo4j_default',
+ parameters: Optional[Union[Mapping, Iterable]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.neo4j_conn_id = neo4j_conn_id
+ self.sql = sql
+ self.parameters = parameters
+ self.hook = None
+
+ def get_hook(self):
+ """Function to retrieve the Neo4j Hook."""
+ return Neo4jHook(conn_id=self.neo4j_conn_id)
+
+ def execute(self, context: Dict) -> None:
+ self.log.info('Executing: %s', self.sql)
+ self.hook = self.get_hook()
+ self.hook.run(self.sql)
diff --git a/airflow/providers/neo4j/provider.yaml b/airflow/providers/neo4j/provider.yaml
new file mode 100644
index 0000000..9081694
--- /dev/null
+++ b/airflow/providers/neo4j/provider.yaml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-neo4j
+name: Neo4j
+description: |
+ `Neo4j <https://neo4j.com/>`__
+
+versions:
+ - 1.0.0
+integrations:
+ - integration-name: Neo4j
+ external-doc-url: https://neo4j.com/
+ how-to-guide:
+ - /docs/apache-airflow-providers-neo4j/operators/neo4j.rst
+ tags: [software]
+
+operators:
+ - integration-name: Neo4j
+ python-modules:
+ - airflow.providers.neo4j.operators.neo4j
+
+hooks:
+ - integration-name: Neo4j
+ python-modules:
+ - airflow.providers.neo4j.hooks.neo4j
+
+hook-class-names:
+ - airflow.providers.neo4j.hooks.neo4j.Neo4jHook
diff --git a/docs/apache-airflow-providers-neo4j/connections/neo4j.rst b/docs/apache-airflow-providers-neo4j/connections/neo4j.rst
new file mode 100644
index 0000000..33fd6b5
--- /dev/null
+++ b/docs/apache-airflow-providers-neo4j/connections/neo4j.rst
@@ -0,0 +1,63 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+Neo4j Connection
+================
+The Neo4j connection type provides connection to a Neo4j database.
+
+Configuring the Connection
+--------------------------
+Host (required)
+ The host to connect to.
+
+Schema (optional)
+ Specify the schema name to be used in the database.
+
+Login (required)
+ Specify the user name to connect.
+
+Password (required)
+ Specify the password to connect.
+
+Extra (optional)
+ Specify the extra parameters (as json dictionary) that can be used in Neo4j
+ connection.
+
+ The following extras are supported:
+
+ - Default - uses bolt scheme(bolt://)
+ - neo4j_scheme - neo4j://
+ - certs_self_signed - neo4j+ssc://
+ - certs_trusted_ca - neo4j+s://
+
+ * ``encrypted``: Sets encrypted=True/False for GraphDatabase.driver, Set to ``True`` for Neo4j Aura.
+ * ``neo4j_scheme``: Specifies the scheme to ``neo4j://``, default is ``bolt://``
+ * ``certs_self_signed``: Sets the URI scheme to support self-signed certificates(``neo4j+ssc://``)
+ * ``certs_trusted_ca``: Sets the URI scheme to support only trusted CA(``neo4j+s://``)
+
+ Example "extras" field:
+
+ .. code-block:: json
+
+ {
+ "encrypted": true,
+ "neo4j_scheme": true,
+ "certs_self_signed": true,
+ "certs_trusted_ca": false
+ }
diff --git a/docs/apache-airflow-providers-neo4j/index.rst b/docs/apache-airflow-providers-neo4j/index.rst
new file mode 100644
index 0000000..cafc57b
--- /dev/null
+++ b/docs/apache-airflow-providers-neo4j/index.rst
@@ -0,0 +1,48 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+``apache-airflow-providers-neo4j``
+==================================
+
+Content
+-------
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Guides
+
+ Connection types <connections/neo4j>
+ Operators <operators/neo4j>
+
+.. toctree::
+ :maxdepth: 1
+ :caption: References
+
+ Python API <_api/airflow/providers/neo4j/index>
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Resources
+
+ Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/neo4j/example_dags>
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Resources
+
+ PyPI Repository <https://pypi.org/project/apache-airflow-providers-neo4j/>
diff --git a/docs/apache-airflow-providers-neo4j/operators/neo4j.rst b/docs/apache-airflow-providers-neo4j/operators/neo4j.rst
new file mode 100644
index 0000000..411aa0c
--- /dev/null
+++ b/docs/apache-airflow-providers-neo4j/operators/neo4j.rst
@@ -0,0 +1,50 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+.. _howto/operator:Neo4jOperator:
+
+Neo4jOperator
+=============
+
+Use the :class:`~airflow.providers.neo4j.operators.Neo4jOperator` to execute
+SQL commands in a `Neo4j <https://neo4j.com/>`__ database.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+Use the ``neo4j_conn_id`` argument to connect to your Neo4j instance where
+the connection metadata is structured as follows:
+
+.. list-table:: Neo4j Airflow Connection Metadata
+ :widths: 25 25
+ :header-rows: 1
+
+ * - Parameter
+ - Input
+ * - Host: string
+ - Neo4j hostname
+ * - Schema: string
+ - Database name
+ * - Login: string
+ - Neo4j user
+ * - Password: string
+ - Neo4j user password
+ * - Port: int
+ - Neo4j port
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index 6f6665e..1678a97 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -183,6 +183,8 @@ can be reported by some tools (even if it is harmless).
+---------------------+-----------------------------------------------------+------------------------------------------------------------------------------------+-----------+
| mysql | ``pip install 'apache-airflow[mysql]'`` | MySQL operators and hook | * |
+---------------------+-----------------------------------------------------+------------------------------------------------------------------------------------+-----------+
+| neo4j | ``pip install 'apache-airflow[neo4j]'`` | Neo4j operators and hook | * |
++---------------------+-----------------------------------------------------+------------------------------------------------------------------------------------+-----------+
| odbc | ``pip install 'apache-airflow[odbc]'`` | ODBC data sources including MS SQL Server | * |
+---------------------+-----------------------------------------------------+------------------------------------------------------------------------------------+-----------+
| openfaas | ``pip install 'apache-airflow[openfaas]'`` | OpenFaaS hooks | * |
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 3f8c273..b97a92d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -251,6 +251,7 @@ NaN
Naik
Namenode
Namespace
+Neo4j
Nextdoor
Nones
NotFound
@@ -989,6 +990,8 @@ navbar
nd
ndjson
neighbours
+neo
+neo4j
neq
networkUri
nginx
@@ -1218,6 +1221,7 @@ sqlsensor
sqoop
src
srv
+ssc
ssd
sshHook
sshtunnel
diff --git a/scripts/in_container/run_install_and_test_provider_packages.sh b/scripts/in_container/run_install_and_test_provider_packages.sh
index 20bb061..ef2227c 100755
--- a/scripts/in_container/run_install_and_test_provider_packages.sh
+++ b/scripts/in_container/run_install_and_test_provider_packages.sh
@@ -101,7 +101,7 @@ function discover_all_provider_packages() {
# Columns is to force it wider, so it doesn't wrap at 80 characters
COLUMNS=180 airflow providers list
- local expected_number_of_providers=61
+ local expected_number_of_providers=62
local actual_number_of_providers
actual_providers=$(airflow providers list --output yaml | grep package_name)
actual_number_of_providers=$(wc -l <<<"$actual_providers")
@@ -124,7 +124,7 @@ function discover_all_hooks() {
group_start "Listing available hooks via 'airflow providers hooks'"
COLUMNS=180 airflow providers hooks
- local expected_number_of_hooks=59
+ local expected_number_of_hooks=60
local actual_number_of_hooks
actual_number_of_hooks=$(airflow providers hooks --output table | grep -c "| apache" | xargs)
if [[ ${actual_number_of_hooks} != "${expected_number_of_hooks}" ]]; then
diff --git a/setup.py b/setup.py
index 16a4e2a..46a2128 100644
--- a/setup.py
+++ b/setup.py
@@ -359,6 +359,7 @@ mysql = [
'mysql-connector-python>=8.0.11, <=8.0.22',
'mysqlclient>=1.3.6,<1.4',
]
+neo4j = ['neo4j>=4.2.1']
odbc = [
'pyodbc',
]
@@ -579,6 +580,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = {
'microsoft.winrm': winrm,
'mongo': mongo,
'mysql': mysql,
+ 'neo4j': neo4j,
'odbc': odbc,
'openfaas': [],
'opsgenie': [],
@@ -715,6 +717,7 @@ ALL_DB_PROVIDERS = [
'microsoft.mssql',
'mongo',
'mysql',
+ 'neo4j',
'postgres',
'presto',
'vertica',
diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py
index 56e3baa..ec05da2 100644
--- a/tests/core/test_providers_manager.py
+++ b/tests/core/test_providers_manager.py
@@ -56,6 +56,7 @@ ALL_PROVIDERS = [
'apache-airflow-providers-microsoft-winrm',
'apache-airflow-providers-mongo',
'apache-airflow-providers-mysql',
+ 'apache-airflow-providers-neo4j',
'apache-airflow-providers-odbc',
'apache-airflow-providers-openfaas',
'apache-airflow-providers-opsgenie',
@@ -121,6 +122,7 @@ CONNECTIONS_LIST = [
'mongo',
'mssql',
'mysql',
+ 'neo4j',
'odbc',
'oracle',
'pig_cli',
diff --git a/tests/providers/neo4j/__init__.py b/tests/providers/neo4j/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/neo4j/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/neo4j/hooks/__init__.py b/tests/providers/neo4j/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/neo4j/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/neo4j/hooks/test_neo4j.py b/tests/providers/neo4j/hooks/test_neo4j.py
new file mode 100644
index 0000000..7f64fc4
--- /dev/null
+++ b/tests/providers/neo4j/hooks/test_neo4j.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import json
+import unittest
+from unittest import mock
+
+from airflow.models import Connection
+from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
+
+
+class TestNeo4jHookConn(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self.neo4j_hook = Neo4jHook()
+ self.connection = Connection(
+ conn_type='neo4j', login='login', password='password', host='host', schema='schema'
+ )
+
+ def test_get_uri_neo4j_scheme(self):
+
+ self.neo4j_hook.get_connection = mock.Mock()
+ self.neo4j_hook.get_connection.return_value = self.connection
+ uri = self.neo4j_hook.get_uri(self.connection)
+
+ self.assertEqual(uri, "bolt://host:7687")
+
+ def test_get_uri_bolt_scheme(self):
+
+ self.connection.extra = json.dumps({"bolt_scheme": True})
+ self.neo4j_hook.get_connection = mock.Mock()
+ self.neo4j_hook.get_connection.return_value = self.connection
+ uri = self.neo4j_hook.get_uri(self.connection)
+
+ self.assertEqual(uri, "bolt://host:7687")
+
+ def test_get_uri_bolt_ssc_scheme(self):
+ self.connection.extra = json.dumps({"certs_self_signed": True, "bolt_scheme": True})
+ self.neo4j_hook.get_connection = mock.Mock()
+ self.neo4j_hook.get_connection.return_value = self.connection
+ uri = self.neo4j_hook.get_uri(self.connection)
+
+ self.assertEqual(uri, "bolt+ssc://host:7687")
+
+ def test_get_uri_bolt_trusted_ca_scheme(self):
+ self.connection.extra = json.dumps({"certs_trusted_ca": True, "bolt_scheme": True})
+ self.neo4j_hook.get_connection = mock.Mock()
+ self.neo4j_hook.get_connection.return_value = self.connection
+ uri = self.neo4j_hook.get_uri(self.connection)
+
+ self.assertEqual(uri, "bolt+s://host:7687")
diff --git a/tests/providers/neo4j/operators/__init__.py b/tests/providers/neo4j/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/neo4j/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/neo4j/operators/test_neo4j.py b/tests/providers/neo4j/operators/test_neo4j.py
new file mode 100644
index 0000000..39c8d69
--- /dev/null
+++ b/tests/providers/neo4j/operators/test_neo4j.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from airflow.models.dag import DAG
+from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
+DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
+DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
+TEST_DAG_ID = 'unit_test_dag'
+
+
+class TestNeo4jOperator(unittest.TestCase):
+ def setUp(self):
+ args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+ dag = DAG(TEST_DAG_ID, default_args=args)
+ self.dag = dag
+
+ @mock.patch('airflow.providers.neo4j.operators.neo4j.Neo4jOperator.get_hook')
+ def test_neo4j_operator_test(self, mock_hook):
+
+ sql = """
+ MATCH (tom {name: "Tom Hanks"}) RETURN tom
+ """
+ op = Neo4jOperator(task_id='basic_neo4j', sql=sql, dag=self.dag)
+ op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)