You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/14 16:28:06 UTC

[airflow] branch master updated: Add Neo4j hook and operator (#13324)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d2977f  Add Neo4j hook and operator (#13324)
1d2977f is described below

commit 1d2977f6a4c67fa6174c79dcdc4e9ee3ce06f1b1
Author: Kanthi <su...@gmail.com>
AuthorDate: Thu Jan 14 11:27:50 2021 -0500

    Add Neo4j hook and operator (#13324)
    
    Close: #12873
---
 CONTRIBUTING.rst                                   |   2 +-
 INSTALL                                            |   2 +-
 airflow/providers/neo4j/README.md                  |  18 ++++
 airflow/providers/neo4j/__init__.py                |  17 +++
 airflow/providers/neo4j/example_dags/__init__.py   |  17 +++
 .../providers/neo4j/example_dags/example_neo4j.py  |  48 +++++++++
 airflow/providers/neo4j/hooks/__init__.py          |  17 +++
 airflow/providers/neo4j/hooks/neo4j.py             | 117 +++++++++++++++++++++
 airflow/providers/neo4j/operators/__init__.py      |  17 +++
 airflow/providers/neo4j/operators/neo4j.py         |  62 +++++++++++
 airflow/providers/neo4j/provider.yaml              |  44 ++++++++
 .../connections/neo4j.rst                          |  63 +++++++++++
 docs/apache-airflow-providers-neo4j/index.rst      |  48 +++++++++
 .../operators/neo4j.rst                            |  50 +++++++++
 docs/apache-airflow/extra-packages-ref.rst         |   2 +
 docs/spelling_wordlist.txt                         |   4 +
 .../run_install_and_test_provider_packages.sh      |   4 +-
 setup.py                                           |   3 +
 tests/core/test_providers_manager.py               |   2 +
 tests/providers/neo4j/__init__.py                  |  17 +++
 tests/providers/neo4j/hooks/__init__.py            |  17 +++
 tests/providers/neo4j/hooks/test_neo4j.py          |  65 ++++++++++++
 tests/providers/neo4j/operators/__init__.py        |  17 +++
 tests/providers/neo4j/operators/test_neo4j.py      |  61 +++++++++++
 24 files changed, 710 insertions(+), 4 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index dc25d3c..b61c7ab 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -696,7 +696,7 @@ apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
 crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
 docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
 google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
-ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas,
+ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
 opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
 rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
 snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm,
diff --git a/INSTALL b/INSTALL
index c2b8fe2..c2a1d04 100644
--- a/INSTALL
+++ b/INSTALL
@@ -97,7 +97,7 @@ apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
 crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
 docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
 google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
-ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas,
+ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
 opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
 rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
 snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm,
diff --git a/airflow/providers/neo4j/README.md b/airflow/providers/neo4j/README.md
new file mode 100644
index 0000000..ef14aff
--- /dev/null
+++ b/airflow/providers/neo4j/README.md
@@ -0,0 +1,18 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
diff --git a/airflow/providers/neo4j/__init__.py b/airflow/providers/neo4j/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/neo4j/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/neo4j/example_dags/__init__.py b/airflow/providers/neo4j/example_dags/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/neo4j/example_dags/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/neo4j/example_dags/example_neo4j.py b/airflow/providers/neo4j/example_dags/example_neo4j.py
new file mode 100644
index 0000000..7d6f2fc
--- /dev/null
+++ b/airflow/providers/neo4j/example_dags/example_neo4j.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example use of Neo4j related operators.
+"""
+
+from airflow import DAG
+from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
+from airflow.utils.dates import days_ago
+
+default_args = {
+    'owner': 'airflow',
+}
+
+dag = DAG(
+    'example_neo4j',
+    default_args=default_args,
+    start_date=days_ago(2),
+    tags=['example'],
+)
+
+# [START run_query_neo4j_operator]
+
+neo4j_task = Neo4jOperator(
+    task_id='run_neo4j_query',
+    neo4j_conn_id='neo4j_conn_id',
+    sql='MATCH (tom {name: "Tom Hanks"}) RETURN tom',
+    dag=dag,
+)
+
+# [END run_query_neo4j_operator]
+
+neo4j_task
diff --git a/airflow/providers/neo4j/hooks/__init__.py b/airflow/providers/neo4j/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/neo4j/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/neo4j/hooks/neo4j.py b/airflow/providers/neo4j/hooks/neo4j.py
new file mode 100644
index 0000000..d473b01
--- /dev/null
+++ b/airflow/providers/neo4j/hooks/neo4j.py
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module allows to connect to a Neo4j database."""
+
+from neo4j import GraphDatabase, Neo4jDriver, Result
+
+from airflow.hooks.base import BaseHook
+from airflow.models import Connection
+
+
+class Neo4jHook(BaseHook):
+    """
+    Interact with Neo4j.
+
+    Performs a connection to Neo4j and runs the query.
+    """
+
+    conn_name_attr = 'neo4j_conn_id'
+    default_conn_name = 'neo4j_default'
+    conn_type = 'neo4j'
+    hook_name = 'Neo4j'
+
+    def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.neo4j_conn_id = conn_id
+        self.connection = kwargs.pop("connection", None)
+        self.client = None
+        self.extras = None
+        self.uri = None
+
+    def get_conn(self) -> Neo4jDriver:
+        """
+        Function that initiates a new Neo4j connection
+        with username, password and database schema.
+        """
+        self.connection = self.get_connection(self.neo4j_conn_id)
+        self.extras = self.connection.extra_dejson.copy()
+
+        self.uri = self.get_uri(self.connection)
+        self.log.info('URI: %s', self.uri)
+
+        if self.client is not None:
+            return self.client
+
+        is_encrypted = self.connection.extra_dejson.get('encrypted', False)
+
+        self.client = GraphDatabase.driver(
+            self.uri, auth=(self.connection.login, self.connection.password), encrypted=is_encrypted
+        )
+
+        return self.client
+
+    def get_uri(self, conn: Connection) -> str:
+        """
+        Build the uri based on extras
+        - Default - uses bolt scheme(bolt://)
+        - neo4j_scheme - neo4j://
+        - certs_self_signed - neo4j+ssc://
+        - certs_trusted_ca - neo4j+s://
+        :param conn: connection object.
+        :return: uri
+        """
+        use_neo4j_scheme = conn.extra_dejson.get('neo4j_scheme', False)
+        scheme = 'neo4j' if use_neo4j_scheme else 'bolt'
+
+        # Self signed certificates
+        ssc = conn.extra_dejson.get('certs_self_signed', False)
+
+        # Only certificates signed by CA.
+        trusted_ca = conn.extra_dejson.get('certs_trusted_ca', False)
+        encryption_scheme = ''
+
+        if ssc:
+            encryption_scheme = '+ssc'
+        elif trusted_ca:
+            encryption_scheme = '+s'
+
+        return '{scheme}{encryption_scheme}://{host}:{port}'.format(
+            scheme=scheme,
+            encryption_scheme=encryption_scheme,
+            host=conn.host,
+            port='7687' if conn.port is None else f'{conn.port}',
+        )
+
+    def run(self, query) -> Result:
+        """
+        Function to create a neo4j session
+        and execute the query in the session.
+
+
+        :param query: Neo4j query
+        :return: Result
+        """
+        driver = self.get_conn()
+        if not self.connection.schema:
+            with driver.session() as session:
+                result = session.run(query)
+        else:
+            with driver.session(database=self.connection.schema) as session:
+                result = session.run(query)
+        return result
diff --git a/airflow/providers/neo4j/operators/__init__.py b/airflow/providers/neo4j/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/neo4j/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/neo4j/operators/neo4j.py b/airflow/providers/neo4j/operators/neo4j.py
new file mode 100644
index 0000000..20df9cb
--- /dev/null
+++ b/airflow/providers/neo4j/operators/neo4j.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Dict, Iterable, Mapping, Optional, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
+from airflow.utils.decorators import apply_defaults
+
+
+class Neo4jOperator(BaseOperator):
+    """
+    Executes sql code in a specific Neo4j database
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:Neo4jOperator`
+
+    :param sql: the sql code to be executed. Can receive a str representing a
+        sql statement, a list of str (sql statements)
+    :type sql: str or list[str]
+    :param neo4j_conn_id: reference to a specific Neo4j database
+    :type neo4j_conn_id: str
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        sql: str,
+        neo4j_conn_id: str = 'neo4j_default',
+        parameters: Optional[Union[Mapping, Iterable]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.neo4j_conn_id = neo4j_conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.hook = None
+
+    def get_hook(self):
+        """Function to retrieve the Neo4j Hook."""
+        return Neo4jHook(conn_id=self.neo4j_conn_id)
+
+    def execute(self, context: Dict) -> None:
+        self.log.info('Executing: %s', self.sql)
+        self.hook = self.get_hook()
+        self.hook.run(self.sql)
diff --git a/airflow/providers/neo4j/provider.yaml b/airflow/providers/neo4j/provider.yaml
new file mode 100644
index 0000000..9081694
--- /dev/null
+++ b/airflow/providers/neo4j/provider.yaml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-neo4j
+name: Neo4j
+description: |
+    `Neo4j <https://neo4j.com/>`__
+
+versions:
+  - 1.0.0
+integrations:
+  - integration-name: Neo4j
+    external-doc-url: https://neo4j.com/
+    how-to-guide:
+      - /docs/apache-airflow-providers-neo4j/operators/neo4j.rst
+    tags: [software]
+
+operators:
+  - integration-name: Neo4j
+    python-modules:
+      - airflow.providers.neo4j.operators.neo4j
+
+hooks:
+  - integration-name: Neo4j
+    python-modules:
+      - airflow.providers.neo4j.hooks.neo4j
+
+hook-class-names:
+  - airflow.providers.neo4j.hooks.neo4j.Neo4jHook
diff --git a/docs/apache-airflow-providers-neo4j/connections/neo4j.rst b/docs/apache-airflow-providers-neo4j/connections/neo4j.rst
new file mode 100644
index 0000000..33fd6b5
--- /dev/null
+++ b/docs/apache-airflow-providers-neo4j/connections/neo4j.rst
@@ -0,0 +1,63 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Neo4j Connection
+================
+The Neo4j connection type provides connection to a Neo4j database.
+
+Configuring the Connection
+--------------------------
+Host (required)
+    The host to connect to.
+
+Schema (optional)
+    Specify the schema name to be used in the database.
+
+Login (required)
+    Specify the user name to connect.
+
+Password (required)
+    Specify the password to connect.
+
+Extra (optional)
+    Specify the extra parameters (as json dictionary) that can be used in Neo4j
+    connection.
+
+    The following extras are supported:
+
+        - Default - uses bolt scheme(bolt://)
+        - neo4j_scheme - neo4j://
+        - certs_self_signed - neo4j+ssc://
+        - certs_trusted_ca - neo4j+s://
+
+      * ``encrypted``: Sets encrypted=True/False for GraphDatabase.driver, Set to ``True`` for Neo4j Aura.
+      * ``neo4j_scheme``: Specifies the scheme to ``neo4j://``, default is ``bolt://``
+      * ``certs_self_signed``: Sets the URI scheme to support self-signed certificates(``neo4j+ssc://``)
+      * ``certs_trusted_ca``: Sets the URI scheme to support only trusted CA(``neo4j+s://``)
+
+      Example "extras" field:
+
+      .. code-block:: json
+
+         {
+            "encrypted": true,
+            "neo4j_scheme": true,
+            "certs_self_signed": true,
+            "certs_trusted_ca": false
+         }
diff --git a/docs/apache-airflow-providers-neo4j/index.rst b/docs/apache-airflow-providers-neo4j/index.rst
new file mode 100644
index 0000000..cafc57b
--- /dev/null
+++ b/docs/apache-airflow-providers-neo4j/index.rst
@@ -0,0 +1,48 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+``apache-airflow-providers-neo4j``
+==================================
+
+Content
+-------
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Guides
+
+    Connection types <connections/neo4j>
+    Operators <operators/neo4j>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: References
+
+    Python API <_api/airflow/providers/neo4j/index>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Resources
+
+    Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/neo4j/example_dags>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Resources
+
+    PyPI Repository <https://pypi.org/project/apache-airflow-providers-neo4j/>
diff --git a/docs/apache-airflow-providers-neo4j/operators/neo4j.rst b/docs/apache-airflow-providers-neo4j/operators/neo4j.rst
new file mode 100644
index 0000000..411aa0c
--- /dev/null
+++ b/docs/apache-airflow-providers-neo4j/operators/neo4j.rst
@@ -0,0 +1,50 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+.. _howto/operator:Neo4jOperator:
+
+Neo4jOperator
+=============
+
+Use the :class:`~airflow.providers.neo4j.operators.Neo4jOperator` to execute
+SQL commands in a `Neo4j <https://neo4j.com/>`__ database.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+Use the ``neo4j_conn_id`` argument to connect to your Neo4j instance where
+the connection metadata is structured as follows:
+
+.. list-table:: Neo4j Airflow Connection Metadata
+   :widths: 25 25
+   :header-rows: 1
+
+   * - Parameter
+     - Input
+   * - Host: string
+     - Neo4j hostname
+   * - Schema: string
+     - Database name
+   * - Login: string
+     - Neo4j user
+   * - Password: string
+     - Neo4j user password
+   * - Port: int
+     - Neo4j port
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index 6f6665e..1678a97 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -183,6 +183,8 @@ can be reported by some tools (even if it is harmless).
 +---------------------+-----------------------------------------------------+------------------------------------------------------------------------------------+-----------+
 | mysql               | ``pip install 'apache-airflow[mysql]'``             | MySQL operators and hook                                                           |     *     |
 +---------------------+-----------------------------------------------------+------------------------------------------------------------------------------------+-----------+
+| neo4j               | ``pip install 'apache-airflow[neo4j]'``             | Neo4j operators and hook                                                           |     *     |
++---------------------+-----------------------------------------------------+------------------------------------------------------------------------------------+-----------+
 | odbc                | ``pip install 'apache-airflow[odbc]'``              | ODBC data sources including MS SQL Server                                          |     *     |
 +---------------------+-----------------------------------------------------+------------------------------------------------------------------------------------+-----------+
 | openfaas            | ``pip install 'apache-airflow[openfaas]'``          | OpenFaaS hooks                                                                     |     *     |
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 3f8c273..b97a92d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -251,6 +251,7 @@ NaN
 Naik
 Namenode
 Namespace
+Neo4j
 Nextdoor
 Nones
 NotFound
@@ -989,6 +990,8 @@ navbar
 nd
 ndjson
 neighbours
+neo
+neo4j
 neq
 networkUri
 nginx
@@ -1218,6 +1221,7 @@ sqlsensor
 sqoop
 src
 srv
+ssc
 ssd
 sshHook
 sshtunnel
diff --git a/scripts/in_container/run_install_and_test_provider_packages.sh b/scripts/in_container/run_install_and_test_provider_packages.sh
index 20bb061..ef2227c 100755
--- a/scripts/in_container/run_install_and_test_provider_packages.sh
+++ b/scripts/in_container/run_install_and_test_provider_packages.sh
@@ -101,7 +101,7 @@ function discover_all_provider_packages() {
     # Columns is to force it wider, so it doesn't wrap at 80 characters
     COLUMNS=180 airflow providers list
 
-    local expected_number_of_providers=61
+    local expected_number_of_providers=62
     local actual_number_of_providers
     actual_providers=$(airflow providers list --output yaml | grep package_name)
     actual_number_of_providers=$(wc -l <<<"$actual_providers")
@@ -124,7 +124,7 @@ function discover_all_hooks() {
     group_start "Listing available hooks via 'airflow providers hooks'"
     COLUMNS=180 airflow providers hooks
 
-    local expected_number_of_hooks=59
+    local expected_number_of_hooks=60
     local actual_number_of_hooks
     actual_number_of_hooks=$(airflow providers hooks --output table | grep -c "| apache" | xargs)
     if [[ ${actual_number_of_hooks} != "${expected_number_of_hooks}" ]]; then
diff --git a/setup.py b/setup.py
index 16a4e2a..46a2128 100644
--- a/setup.py
+++ b/setup.py
@@ -359,6 +359,7 @@ mysql = [
     'mysql-connector-python>=8.0.11, <=8.0.22',
     'mysqlclient>=1.3.6,<1.4',
 ]
+neo4j = ['neo4j>=4.2.1']
 odbc = [
     'pyodbc',
 ]
@@ -579,6 +580,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = {
     'microsoft.winrm': winrm,
     'mongo': mongo,
     'mysql': mysql,
+    'neo4j': neo4j,
     'odbc': odbc,
     'openfaas': [],
     'opsgenie': [],
@@ -715,6 +717,7 @@ ALL_DB_PROVIDERS = [
     'microsoft.mssql',
     'mongo',
     'mysql',
+    'neo4j',
     'postgres',
     'presto',
     'vertica',
diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py
index 56e3baa..ec05da2 100644
--- a/tests/core/test_providers_manager.py
+++ b/tests/core/test_providers_manager.py
@@ -56,6 +56,7 @@ ALL_PROVIDERS = [
     'apache-airflow-providers-microsoft-winrm',
     'apache-airflow-providers-mongo',
     'apache-airflow-providers-mysql',
+    'apache-airflow-providers-neo4j',
     'apache-airflow-providers-odbc',
     'apache-airflow-providers-openfaas',
     'apache-airflow-providers-opsgenie',
@@ -121,6 +122,7 @@ CONNECTIONS_LIST = [
     'mongo',
     'mssql',
     'mysql',
+    'neo4j',
     'odbc',
     'oracle',
     'pig_cli',
diff --git a/tests/providers/neo4j/__init__.py b/tests/providers/neo4j/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/neo4j/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/neo4j/hooks/__init__.py b/tests/providers/neo4j/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/neo4j/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/neo4j/hooks/test_neo4j.py b/tests/providers/neo4j/hooks/test_neo4j.py
new file mode 100644
index 0000000..7f64fc4
--- /dev/null
+++ b/tests/providers/neo4j/hooks/test_neo4j.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import json
+import unittest
+from unittest import mock
+
+from airflow.models import Connection
+from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
+
+
+class TestNeo4jHookConn(unittest.TestCase):
+    def setUp(self):
+        super().setUp()
+        self.neo4j_hook = Neo4jHook()
+        self.connection = Connection(
+            conn_type='neo4j', login='login', password='password', host='host', schema='schema'
+        )
+
+    def test_get_uri_neo4j_scheme(self):
+
+        self.neo4j_hook.get_connection = mock.Mock()
+        self.neo4j_hook.get_connection.return_value = self.connection
+        uri = self.neo4j_hook.get_uri(self.connection)
+
+        self.assertEqual(uri, "bolt://host:7687")
+
+    def test_get_uri_bolt_scheme(self):
+
+        self.connection.extra = json.dumps({"bolt_scheme": True})
+        self.neo4j_hook.get_connection = mock.Mock()
+        self.neo4j_hook.get_connection.return_value = self.connection
+        uri = self.neo4j_hook.get_uri(self.connection)
+
+        self.assertEqual(uri, "bolt://host:7687")
+
+    def test_get_uri_bolt_ssc_scheme(self):
+        self.connection.extra = json.dumps({"certs_self_signed": True, "bolt_scheme": True})
+        self.neo4j_hook.get_connection = mock.Mock()
+        self.neo4j_hook.get_connection.return_value = self.connection
+        uri = self.neo4j_hook.get_uri(self.connection)
+
+        self.assertEqual(uri, "bolt+ssc://host:7687")
+
+    def test_get_uri_bolt_trusted_ca_scheme(self):
+        self.connection.extra = json.dumps({"certs_trusted_ca": True, "bolt_scheme": True})
+        self.neo4j_hook.get_connection = mock.Mock()
+        self.neo4j_hook.get_connection.return_value = self.connection
+        uri = self.neo4j_hook.get_uri(self.connection)
+
+        self.assertEqual(uri, "bolt+s://host:7687")
diff --git a/tests/providers/neo4j/operators/__init__.py b/tests/providers/neo4j/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/neo4j/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/neo4j/operators/test_neo4j.py b/tests/providers/neo4j/operators/test_neo4j.py
new file mode 100644
index 0000000..39c8d69
--- /dev/null
+++ b/tests/providers/neo4j/operators/test_neo4j.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from airflow.models.dag import DAG
+from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
+DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
+DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
+TEST_DAG_ID = 'unit_test_dag'
+
+
+class TestNeo4jOperator(unittest.TestCase):
+    def setUp(self):
+        args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+        dag = DAG(TEST_DAG_ID, default_args=args)
+        self.dag = dag
+
+    @mock.patch('airflow.providers.neo4j.operators.neo4j.Neo4jOperator.get_hook')
+    def test_neo4j_operator_test(self, mock_hook):
+
+        sql = """
+            MATCH (tom {name: "Tom Hanks"}) RETURN tom
+            """
+        op = Neo4jOperator(task_id='basic_neo4j', sql=sql, dag=self.dag)
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)