You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by mo...@apache.org on 2023/06/29 12:15:12 UTC

[airflow] branch main updated: openlineage, common.sql: provide OL SQL parser as internal OpenLineage provider API (#31398)

This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f2e2125b07 openlineage, common.sql:  provide OL SQL parser as internal OpenLineage provider API (#31398)
f2e2125b07 is described below

commit f2e2125b070794b6a66fb3e2840ca14d07054cf2
Author: JDarDagran <ku...@gmail.com>
AuthorDate: Thu Jun 29 14:15:04 2023 +0200

    openlineage, common.sql:  provide OL SQL parser as internal OpenLineage provider API (#31398)
    
    * Add SQLParser class serving as the API for openlineage_sql library.
    Implement base methods for SQLExecuteQueryOperator & DbApiHook.
    
    Signed-off-by: Jakub Dardzinski <ku...@gmail.com>
    
    Rename methods to expose their purpose for OpenLineage.
    
    Signed-off-by: Jakub Dardzinski <ku...@gmail.com>
    
    * Rewrite information schema query construction to SQLALchemy ORM.
    
    Signed-off-by: Jakub Dardzinski <ku...@gmail.com>
    
    * Clean up in-class reference
    
    Instead of referencing the SQLParser directly, modify various static
    methods to class methods instead, so they can use the cls argument
    to avoid spelling out the class name repeatedly.
    
    Also added a few changes to better ultilize type reference and eliminate
    some verbose type annotations.
    
    * Clean up typing and iterator usage
    
    * Add static typing to hint returned type.
    
    Signed-off-by: Jakub Dardzinski <ku...@gmail.com>
    
    * Fix mypy issues.
    
    Signed-off-by: Jakub Dardzinski <ku...@gmail.com>
    
    ---------
    
    Signed-off-by: Jakub Dardzinski <ku...@gmail.com>
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
 airflow/providers/common/sql/hooks/sql.py          |  59 +++-
 airflow/providers/common/sql/operators/sql.py      |  59 ++++
 airflow/providers/openlineage/extractors/base.py   |  29 +-
 airflow/providers/openlineage/sqlparser.py         | 278 +++++++++++++++++
 airflow/providers/openlineage/utils/sql.py         | 191 ++++++++++++
 dev/breeze/tests/test_selective_checks.py          |   1 +
 generated/provider_dependencies.json               |   8 +-
 .../common/sql/operators/test_sql_execute.py       |  82 ++++-
 tests/providers/openlineage/utils/__init__.py      |  16 +
 tests/providers/openlineage/utils/test_sql.py      | 339 +++++++++++++++++++++
 .../providers/openlineage/utils/test_sqlparser.py  | 222 ++++++++++++++
 11 files changed, 1271 insertions(+), 13 deletions(-)

diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py
index b766e064db..4ff6e62e24 100644
--- a/airflow/providers/common/sql/hooks/sql.py
+++ b/airflow/providers/common/sql/hooks/sql.py
@@ -18,7 +18,8 @@ from __future__ import annotations
 
 from contextlib import closing
 from datetime import datetime
-from typing import Any, Callable, Iterable, Mapping, Protocol, Sequence, cast
+from typing import TYPE_CHECKING, Any, Callable, Iterable, Mapping, Protocol, Sequence, cast
+from urllib.parse import urlparse
 
 import sqlparse
 from packaging.version import Version
@@ -28,6 +29,10 @@ from airflow import AirflowException
 from airflow.hooks.base import BaseHook
 from airflow.version import version
 
+if TYPE_CHECKING:
+    from airflow.providers.openlineage.extractors import OperatorLineage
+    from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
 
 def return_single_query_results(sql: str | Iterable[str], return_last: bool, split_statements: bool):
     """
@@ -255,8 +260,7 @@ class DbApiHook(BaseForDbApiHook):
         :return: list of individual expressions
         """
         splits = sqlparse.split(sqlparse.format(sql, strip_comments=True))
-        statements: list[str] = list(filter(None, splits))
-        return statements
+        return [s for s in splits if s]
 
     @property
     def last_description(self) -> Sequence[Sequence] | None:
@@ -515,3 +519,52 @@ class DbApiHook(BaseForDbApiHook):
             message = str(e)
 
         return status, message
+
+    def get_openlineage_database_info(self, connection) -> DatabaseInfo | None:
+        """
+        Returns database specific information needed to generate and parse lineage metadata.
+
+        This includes information helpful for constructing information schema query
+        and creating correct namespace.
+
+        :param connection: Airflow connection to reduce calls of `get_connection` method
+        """
+
+    def get_openlineage_database_dialect(self, connection) -> str:
+        """
+        Returns database dialect used for SQL parsing.
+
+        For a list of supported dialects check: https://openlineage.io/docs/development/sql#sql-dialects
+        """
+        return "generic"
+
+    def get_openlineage_default_schema(self) -> str:
+        """
+        Returns default schema specific to database.
+
+        .. seealso::
+            - :class:`airflow.providers.openlineage.sqlparser.SQLParser`
+        """
+        return self.__schema or "public"
+
+    def get_openlineage_database_specific_lineage(self, task_instance) -> OperatorLineage | None:
+        """
+        Returns additional database specific lineage, e.g. query execution information.
+
+        This method is called only on completion of the task.
+
+        :param task_instance: this may be used to retrieve additional information
+            that is collected during runtime of the task
+        """
+
+    @staticmethod
+    def get_openlineage_authority_part(connection) -> str:
+        """
+        This method serves as common method for several hooks to get authority part from Airflow Connection.
+
+        The authority represents the hostname and port of the connection
+        and conforms OpenLineage naming convention for a number of databases (e.g. MySQL, Postgres, Trino).
+        """
+        parsed = urlparse(connection.get_uri())
+        authority = f"{parsed.hostname}:{parsed.port}"
+        return authority
diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py
index a8eea3d6d8..5f129e0817 100644
--- a/airflow/providers/common/sql/operators/sql.py
+++ b/airflow/providers/common/sql/operators/sql.py
@@ -26,8 +26,10 @@ from airflow.exceptions import AirflowException, AirflowFailException
 from airflow.hooks.base import BaseHook
 from airflow.models import BaseOperator, SkipMixin
 from airflow.providers.common.sql.hooks.sql import DbApiHook, fetch_all_handler, return_single_query_results
+from airflow.utils.helpers import merge_dicts
 
 if TYPE_CHECKING:
+    from airflow.providers.openlineage.extractors import OperatorLineage
     from airflow.utils.context import Context
 
 
@@ -290,6 +292,63 @@ class SQLExecuteQueryOperator(BaseSQLOperator):
         if isinstance(self.parameters, str):
             self.parameters = ast.literal_eval(self.parameters)
 
+    def get_openlineage_facets_on_start(self) -> OperatorLineage | None:
+        try:
+            from airflow.providers.openlineage.sqlparser import SQLParser
+        except ImportError:
+            return None
+
+        hook = self.get_db_hook()
+
+        connection = hook.get_connection(getattr(hook, hook.conn_name_attr))
+        try:
+            database_info = hook.get_openlineage_database_info(connection)
+        except AttributeError:
+            self.log.debug("%s has no database info provided", hook)
+            database_info = None
+
+        if database_info is None:
+            return None
+
+        try:
+            sql_parser = SQLParser(
+                dialect=hook.get_openlineage_database_dialect(connection),
+                default_schema=hook.get_openlineage_default_schema(),
+            )
+        except AttributeError:
+            self.log.debug("%s failed to get database dialect", hook)
+            return None
+
+        operator_lineage = sql_parser.generate_openlineage_metadata_from_sql(
+            sql=self.sql, hook=hook, database_info=database_info, database=self.database
+        )
+
+        return operator_lineage
+
+    def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage | None:
+        operator_lineage = self.get_openlineage_facets_on_start() or OperatorLineage()
+
+        try:
+            from airflow.providers.openlineage.extractors import OperatorLineage
+        except ImportError:
+            return operator_lineage
+
+        hook = self.get_db_hook()
+        try:
+            database_specific_lineage = hook.get_openlineage_database_specific_lineage(task_instance)
+        except AttributeError:
+            database_specific_lineage = None
+
+        if database_specific_lineage is None:
+            return operator_lineage
+
+        return OperatorLineage(
+            inputs=operator_lineage.inputs + database_specific_lineage.inputs,
+            outputs=operator_lineage.outputs + database_specific_lineage.outputs,
+            run_facets=merge_dicts(operator_lineage.run_facets, database_specific_lineage.run_facets),
+            job_facets=merge_dicts(operator_lineage.job_facets, database_specific_lineage.job_facets),
+        )
+
 
 class SQLColumnCheckOperator(BaseSQLOperator):
     """
diff --git a/airflow/providers/openlineage/extractors/base.py b/airflow/providers/openlineage/extractors/base.py
index dd714ea89a..de84f0f63f 100644
--- a/airflow/providers/openlineage/extractors/base.py
+++ b/airflow/providers/openlineage/extractors/base.py
@@ -22,6 +22,7 @@ from abc import ABC, abstractmethod
 from attrs import Factory, define
 
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import TaskInstanceState
 from openlineage.client.facet import BaseFacet
 from openlineage.client.run import Dataset
 
@@ -88,16 +89,30 @@ class DefaultExtractor(BaseExtractor):
             return None
 
     def extract_on_complete(self, task_instance) -> OperatorLineage | None:
+        if task_instance.state == TaskInstanceState.FAILED:
+            on_failed = getattr(self.operator, "get_openlineage_facets_on_failure", None)
+            if on_failed and callable(on_failed):
+                return self._get_openlineage_facets(on_failed, task_instance)
         on_complete = getattr(self.operator, "get_openlineage_facets_on_complete", None)
         if on_complete and callable(on_complete):
             return self._get_openlineage_facets(on_complete, task_instance)
         return self.extract()
 
     def _get_openlineage_facets(self, get_facets_method, *args) -> OperatorLineage | None:
-        facets: OperatorLineage = get_facets_method(*args)
-        return OperatorLineage(
-            inputs=facets.inputs,
-            outputs=facets.outputs,
-            run_facets=facets.run_facets,
-            job_facets=facets.job_facets,
-        )
+        try:
+            facets = get_facets_method(*args)
+        except ImportError:
+            self.log.exception(
+                "OpenLineage provider method failed to import OpenLineage integration. "
+                "This should not happen."
+            )
+        except Exception:
+            self.log.exception("OpenLineage provider method failed to extract data from provider. ")
+        else:
+            return OperatorLineage(
+                inputs=facets.inputs,
+                outputs=facets.outputs,
+                run_facets=facets.run_facets,
+                job_facets=facets.job_facets,
+            )
+        return None
diff --git a/airflow/providers/openlineage/sqlparser.py b/airflow/providers/openlineage/sqlparser.py
new file mode 100644
index 0000000000..657428549e
--- /dev/null
+++ b/airflow/providers/openlineage/sqlparser.py
@@ -0,0 +1,278 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Callable
+
+import sqlparse
+from attrs import define
+
+from airflow.providers.openlineage.extractors.base import OperatorLineage
+from airflow.providers.openlineage.utils.sql import (
+    TablesHierarchy,
+    create_information_schema_query,
+    get_table_schemas,
+)
+from airflow.typing_compat import TypedDict
+from openlineage.client.facet import BaseFacet, ExtractionError, ExtractionErrorRunFacet, SqlJobFacet
+from openlineage.client.run import Dataset
+from openlineage.common.sql import DbTableMeta, SqlMeta, parse
+
+if TYPE_CHECKING:
+    from airflow.hooks.base import BaseHook
+
+DEFAULT_NAMESPACE = "default"
+DEFAULT_INFORMATION_SCHEMA_COLUMNS = [
+    "table_schema",
+    "table_name",
+    "column_name",
+    "ordinal_position",
+    "udt_name",
+]
+DEFAULT_INFORMATION_SCHEMA_TABLE_NAME = "information_schema.columns"
+
+
+def default_normalize_name_method(name: str) -> str:
+    return name.lower()
+
+
+class GetTableSchemasParams(TypedDict):
+    """get_table_schemas params."""
+
+    normalize_name: Callable[[str], str]
+    is_cross_db: bool
+    information_schema_columns: list[str]
+    information_schema_table: str
+    is_uppercase_names: bool
+    database: str | None
+
+
+@define
+class DatabaseInfo:
+    """
+    Contains database specific information needed to process SQL statement parse result.
+
+    :param scheme: Scheme part of URI in OpenLineage namespace.
+    :param authority: Authority part of URI in OpenLineage namespace.
+        For most cases it should return `{host}:{port}` part of Airflow connection.
+        See: https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
+    :param database: Takes precedence over parsed database name.
+    :param information_schema_columns: List of columns names from information schema table.
+    :param information_schema_table_name: Information schema table name.
+    :param is_information_schema_cross_db: Specifies if information schema contains
+        cross-database data.
+    :param is_uppercase_names: Specifies if database accepts only uppercase names (e.g. Snowflake).
+    :param normalize_name_method: Method to normalize database, schema and table names.
+        Defaults to `name.lower()`.
+    """
+
+    scheme: str
+    authority: str | None = None
+    database: str | None = None
+    information_schema_columns: list[str] = DEFAULT_INFORMATION_SCHEMA_COLUMNS
+    information_schema_table_name: str = DEFAULT_INFORMATION_SCHEMA_TABLE_NAME
+    is_information_schema_cross_db: bool = False
+    is_uppercase_names: bool = False
+    normalize_name_method: Callable[[str], str] = default_normalize_name_method
+
+
+class SQLParser:
+    """Interface for openlineage-sql.
+
+    :param dialect: dialect specific to the database
+    :param default_schema: schema applied to each table with no schema parsed
+    """
+
+    def __init__(self, dialect: str | None = None, default_schema: str | None = None) -> None:
+        self.dialect = dialect
+        self.default_schema = default_schema
+
+    def parse(self, sql: list[str] | str) -> SqlMeta | None:
+        """Parse a single or a list of SQL statements."""
+        return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema)
+
+    def parse_table_schemas(
+        self,
+        hook: BaseHook,
+        inputs: list[DbTableMeta],
+        outputs: list[DbTableMeta],
+        database_info: DatabaseInfo,
+        namespace: str = DEFAULT_NAMESPACE,
+        database: str | None = None,
+    ) -> tuple[list[Dataset], ...]:
+        """Parse schemas for input and output tables."""
+        database_kwargs: GetTableSchemasParams = {
+            "normalize_name": database_info.normalize_name_method,
+            "is_cross_db": database_info.is_information_schema_cross_db,
+            "information_schema_columns": database_info.information_schema_columns,
+            "information_schema_table": database_info.information_schema_table_name,
+            "is_uppercase_names": database_info.is_uppercase_names,
+            "database": database or database_info.database,
+        }
+        return get_table_schemas(
+            hook,
+            namespace,
+            database or database_info.database,
+            self.create_information_schema_query(tables=inputs, **database_kwargs) if inputs else None,
+            self.create_information_schema_query(tables=outputs, **database_kwargs) if outputs else None,
+        )
+
+    def generate_openlineage_metadata_from_sql(
+        self,
+        sql: list[str] | str,
+        hook: BaseHook,
+        database_info: DatabaseInfo,
+        database: str | None = None,
+    ) -> OperatorLineage:
+        """Parses SQL statement(s) and generates OpenLineage metadata.
+
+        Generated OpenLineage metadata contains:
+
+        * input tables with schemas parsed
+        * output tables with schemas parsed
+        * run facets
+        * job facets.
+
+        :param sql: a SQL statement or list of SQL statement to be parsed
+        :param hook: Airflow Hook used to connect to the database
+        :param database_info: database specific information
+        :param database: when passed it takes precedence over parsed database name
+        """
+        job_facets: dict[str, BaseFacet] = {"sql": SqlJobFacet(query=self.normalize_sql(sql))}
+        parse_result = self.parse(self.split_sql_string(sql))
+        if not parse_result:
+            return OperatorLineage(job_facets=job_facets)
+
+        run_facets: dict[str, BaseFacet] = {}
+        if parse_result.errors:
+            run_facets["extractionError"] = ExtractionErrorRunFacet(
+                totalTasks=len(sql) if isinstance(sql, list) else 1,
+                failedTasks=len(parse_result.errors),
+                errors=[
+                    ExtractionError(
+                        errorMessage=error.message,
+                        stackTrace=None,
+                        task=error.origin_statement,
+                        taskNumber=error.index,
+                    )
+                    for error in parse_result.errors
+                ],
+            )
+
+        namespace = self.create_namespace(database_info=database_info)
+        inputs, outputs = self.parse_table_schemas(
+            hook=hook,
+            inputs=parse_result.in_tables,
+            outputs=parse_result.out_tables,
+            namespace=namespace,
+            database=database,
+            database_info=database_info,
+        )
+
+        return OperatorLineage(
+            inputs=inputs,
+            outputs=outputs,
+            run_facets=run_facets,
+            job_facets=job_facets,
+        )
+
+    @staticmethod
+    def create_namespace(database_info: DatabaseInfo) -> str:
+        return (
+            f"{database_info.scheme}://{database_info.authority}"
+            if database_info.authority
+            else database_info.scheme
+        )
+
+    @classmethod
+    def normalize_sql(cls, sql: list[str] | str) -> str:
+        """Makes sure to return a semicolon-separated SQL statements."""
+        return ";\n".join(stmt.rstrip(" ;\r\n") for stmt in cls.split_sql_string(sql))
+
+    @classmethod
+    def split_sql_string(cls, sql: list[str] | str) -> list[str]:
+        """
+        Split SQL string into list of statements
+        Tries to use `DbApiHook.split_sql_string` if available.
+        Otherwise, uses the same logic.
+        """
+        try:
+            from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+            split_statement = DbApiHook.split_sql_string
+        except (ImportError, AttributeError):
+            # No common.sql Airflow provider available or version is too old.
+            def split_statement(sql: str) -> list[str]:
+                splits = sqlparse.split(sqlparse.format(sql, strip_comments=True))
+                return [s for s in splits if s]
+
+        if isinstance(sql, str):
+            return split_statement(sql)
+        return [obj for stmt in sql for obj in cls.split_sql_string(stmt) if obj != ""]
+
+    @classmethod
+    def create_information_schema_query(
+        cls,
+        tables: list[DbTableMeta],
+        normalize_name: Callable[[str], str],
+        is_cross_db: bool,
+        information_schema_columns,
+        information_schema_table,
+        is_uppercase_names,
+        database: str | None = None,
+    ) -> str:
+        """Creates SELECT statement to query information schema table."""
+        tables_hierarchy = cls._get_tables_hierarchy(
+            tables,
+            normalize_name=normalize_name,
+            database=database,
+            is_cross_db=is_cross_db,
+        )
+        return create_information_schema_query(
+            columns=information_schema_columns,
+            information_schema_table_name=information_schema_table,
+            tables_hierarchy=tables_hierarchy,
+            uppercase_names=is_uppercase_names,
+        )
+
+    @staticmethod
+    def _get_tables_hierarchy(
+        tables: list[DbTableMeta],
+        normalize_name: Callable[[str], str],
+        database: str | None = None,
+        is_cross_db: bool = False,
+    ) -> TablesHierarchy:
+        """
+        Creates a hierarchy of database -> schema -> table name.
+
+        This helps to create simpler information schema query grouped by
+        database and schema.
+        :param tables: List of tables.
+        :param normalize_name: A method to normalize all names.
+        :param is_cross_db: If false, set top (database) level to None
+            when creating hierarchy.
+        """
+        hierarchy: TablesHierarchy = {}
+        for table in tables:
+            if is_cross_db:
+                db = table.database or database
+            else:
+                db = None
+            schemas = hierarchy.setdefault(normalize_name(db) if db else db, {})
+            tables = schemas.setdefault(normalize_name(table.schema) if table.schema else db, [])
+            tables.append(table.name)
+        return hierarchy
diff --git a/airflow/providers/openlineage/utils/sql.py b/airflow/providers/openlineage/utils/sql.py
new file mode 100644
index 0000000000..317e46e442
--- /dev/null
+++ b/airflow/providers/openlineage/utils/sql.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from collections import defaultdict
+from contextlib import closing
+from enum import IntEnum
+from typing import TYPE_CHECKING, Dict, List, Optional
+
+from attrs import define
+from sqlalchemy import Column, MetaData, Table, and_, union_all
+
+from openlineage.client.facet import SchemaDatasetFacet, SchemaField
+from openlineage.client.run import Dataset
+
+if TYPE_CHECKING:
+    from sqlalchemy.engine import Engine
+    from sqlalchemy.sql import ClauseElement
+
+    from airflow.hooks.base import BaseHook
+
+
+logger = logging.getLogger(__name__)
+
+
+class ColumnIndex(IntEnum):
+    """Enumerates the indices of columns in information schema view."""
+
+    SCHEMA = 0
+    TABLE_NAME = 1
+    COLUMN_NAME = 2
+    ORDINAL_POSITION = 3
+    # Use 'udt_name' which is the underlying type of column
+    UDT_NAME = 4
+    # Database is optional as 5th column
+    DATABASE = 5
+
+
+TablesHierarchy = Dict[Optional[str], Dict[Optional[str], List[str]]]
+
+
+@define
+class TableSchema:
+    """Temporary object used to construct OpenLineage Dataset."""
+
+    table: str
+    schema: str | None
+    database: str | None
+    fields: list[SchemaField]
+
+    def to_dataset(self, namespace: str, database: str | None = None) -> Dataset:
+        # Prefix the table name with database and schema name using
+        # the format: {database_name}.{table_schema}.{table_name}.
+        name = ".".join(
+            part
+            for part in [self.database if self.database else database, self.schema, self.table]
+            if part is not None
+        )
+        return Dataset(
+            namespace=namespace,
+            name=name,
+            facets={"schema": SchemaDatasetFacet(fields=self.fields)} if len(self.fields) > 0 else {},
+        )
+
+
+def get_table_schemas(
+    hook: BaseHook,
+    namespace: str,
+    database: str | None,
+    in_query: str | None,
+    out_query: str | None,
+) -> tuple[list[Dataset], list[Dataset]]:
+    """Query database for table schemas.
+
+    Uses provided hook. Responsibility to provide queries for this function is on particular extractors.
+    If query for input or output table isn't provided, the query is skipped.
+    """
+    # Do not query if we did not get both queries
+    if not in_query and not out_query:
+        return [], []
+
+    with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cursor:
+        if in_query:
+            cursor.execute(in_query)
+            in_datasets = [x.to_dataset(namespace, database) for x in parse_query_result(cursor)]
+        else:
+            in_datasets = []
+        if out_query:
+            cursor.execute(out_query)
+            out_datasets = [x.to_dataset(namespace, database) for x in parse_query_result(cursor)]
+        else:
+            out_datasets = []
+    return in_datasets, out_datasets
+
+
+def parse_query_result(cursor) -> list[TableSchema]:
+    """Fetch results from DB-API 2.0 cursor and creates list of table schemas.
+
+    For each row it creates :class:`TableSchema`.
+    """
+    schemas: dict = {}
+    columns: dict = defaultdict(list)
+    for row in cursor.fetchall():
+        table_schema_name: str = row[ColumnIndex.SCHEMA]
+        table_name: str = row[ColumnIndex.TABLE_NAME]
+        table_column: SchemaField = SchemaField(
+            name=row[ColumnIndex.COLUMN_NAME],
+            type=row[ColumnIndex.UDT_NAME],
+            description=None,
+        )
+        ordinal_position = row[ColumnIndex.ORDINAL_POSITION]
+        try:
+            table_database = row[ColumnIndex.DATABASE]
+        except IndexError:
+            table_database = None
+
+        # Attempt to get table schema
+        table_key = ".".join(filter(None, [table_database, table_schema_name, table_name]))
+
+        schemas[table_key] = TableSchema(
+            table=table_name, schema=table_schema_name, database=table_database, fields=[]
+        )
+        columns[table_key].append((ordinal_position, table_column))
+
+    for schema in schemas.values():
+        table_key = ".".join(filter(None, [schema.database, schema.schema, schema.table]))
+        schema.fields = [x for _, x in sorted(columns[table_key])]
+
+    return list(schemas.values())
+
+
+def create_information_schema_query(
+    columns: list[str],
+    information_schema_table_name: str,
+    tables_hierarchy: TablesHierarchy,
+    uppercase_names: bool = False,
+    sqlalchemy_engine: Engine | None = None,
+) -> str:
+    """Creates query for getting table schemas from information schema."""
+    metadata = MetaData(sqlalchemy_engine)
+    select_statements = []
+    for db, schema_mapping in tables_hierarchy.items():
+        schema, table_name = information_schema_table_name.split(".")
+        if db:
+            schema = f"{db}.{schema}"
+        information_schema_table = Table(
+            table_name, metadata, *[Column(column) for column in columns], schema=schema
+        )
+        filter_clauses = create_filter_clauses(schema_mapping, information_schema_table, uppercase_names)
+        select_statements.append(information_schema_table.select().filter(*filter_clauses))
+    return str(
+        union_all(*select_statements).compile(sqlalchemy_engine, compile_kwargs={"literal_binds": True})
+    )
+
+
+def create_filter_clauses(
+    schema_mapping: dict, information_schema_table: Table, uppercase_names: bool = False
+) -> ClauseElement:
+    """
+    Creates comprehensive filter clauses for all tables in one database.
+
+    :param schema_mapping: a dictionary of schema names and list of tables in each
+    :param information_schema_table: `sqlalchemy.Table` instance used to construct clauses
+        For most SQL dbs it contains `table_name` and `table_schema` columns,
+        therefore it is expected the table has them defined.
+    :param uppercase_names: if True use schema and table names uppercase
+    """
+    filter_clauses = []
+    for schema, tables in schema_mapping.items():
+        filter_clause = information_schema_table.c.table_name.in_(
+            name.upper() if uppercase_names else name for name in tables
+        )
+        if schema:
+            filter_clause = and_(information_schema_table.c.table_schema == schema, filter_clause)
+        filter_clauses.append(filter_clause)
+    return filter_clauses
diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py
index 0081b6a2a7..0dc12501b1 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -994,6 +994,7 @@ def test_upgrade_to_newer_dependencies(files: tuple[str, ...], expected_outputs:
                 "--package-filter apache-airflow-providers-microsoft-mssql "
                 "--package-filter apache-airflow-providers-mysql "
                 "--package-filter apache-airflow-providers-odbc "
+                "--package-filter apache-airflow-providers-openlineage "
                 "--package-filter apache-airflow-providers-oracle "
                 "--package-filter apache-airflow-providers-postgres "
                 "--package-filter apache-airflow-providers-presto "
diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json
index 5ce7ccd295..de1f387add 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -271,7 +271,9 @@
       "apache-airflow>=2.4.0",
       "sqlparse>=0.4.2"
     ],
-    "cross-providers-deps": [],
+    "cross-providers-deps": [
+      "openlineage"
+    ],
     "excluded-python-versions": []
   },
   "databricks": {
@@ -637,7 +639,9 @@
       "openlineage-integration-common>=0.28.0",
       "openlineage-python>=0.28.0"
     ],
-    "cross-providers-deps": [],
+    "cross-providers-deps": [
+      "common.sql"
+    ],
     "excluded-python-versions": []
   },
   "opsgenie": {
diff --git a/tests/providers/common/sql/operators/test_sql_execute.py b/tests/providers/common/sql/operators/test_sql_execute.py
index 0459472d3a..5bc2a932f8 100644
--- a/tests/providers/common/sql/operators/test_sql_execute.py
+++ b/tests/providers/common/sql/operators/test_sql_execute.py
@@ -21,9 +21,13 @@ from typing import Any, NamedTuple, Sequence
 from unittest.mock import MagicMock
 
 import pytest
+from openlineage.client.facet import SchemaDatasetFacet, SchemaField, SqlJobFacet
+from openlineage.client.run import Dataset
 
-from airflow.providers.common.sql.hooks.sql import fetch_all_handler
+from airflow.models import Connection
+from airflow.providers.common.sql.hooks.sql import DbApiHook, fetch_all_handler
 from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
+from airflow.providers.openlineage.extractors.base import OperatorLineage
 
 DATE = "2017-04-20"
 TASK_ID = "sql-operator"
@@ -274,3 +278,79 @@ def test_exec_success_with_process_output(
         return_last=return_last,
         split_statements=split_statement,
     )
+
+
+def test_execute_openlineage_events():
+    class DBApiHookForTests(DbApiHook):
+        conn_name_attr = "sql_default"
+        get_conn = MagicMock(name="conn")
+        get_connection = MagicMock()
+
+        def get_openlineage_database_info(self, connection):
+            from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+            return DatabaseInfo(
+                scheme="sqlscheme", authority=DbApiHook.get_openlineage_authority_part(connection)
+            )
+
+        def get_openlineage_database_specific_lineage(self, task_instance):
+            return OperatorLineage(run_facets={"completed": True})
+
+    dbapi_hook = DBApiHookForTests()
+
+    class SQLExecuteQueryOperatorForTest(SQLExecuteQueryOperator):
+        def get_db_hook(self):
+            return dbapi_hook
+
+    sql = """CREATE TABLE IF NOT EXISTS popular_orders_day_of_week (
+        order_day_of_week VARCHAR(64) NOT NULL,
+        order_placed_on   TIMESTAMP NOT NULL,
+        orders_placed     INTEGER NOT NULL
+    );
+FORGOT TO COMMENT"""
+    op = SQLExecuteQueryOperatorForTest(task_id=TASK_ID, sql=sql)
+    DB_SCHEMA_NAME = "PUBLIC"
+    rows = [
+        (DB_SCHEMA_NAME, "popular_orders_day_of_week", "order_day_of_week", 1, "varchar"),
+        (DB_SCHEMA_NAME, "popular_orders_day_of_week", "order_placed_on", 2, "timestamp"),
+        (DB_SCHEMA_NAME, "popular_orders_day_of_week", "orders_placed", 3, "int4"),
+    ]
+    dbapi_hook.get_connection.return_value = Connection(
+        conn_id="sql_default", conn_type="postgres", host="host", port=1234
+    )
+    dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [rows, []]
+
+    lineage = op.get_openlineage_facets_on_start()
+    assert len(lineage.inputs) == 0
+    assert lineage.outputs == [
+        Dataset(
+            namespace="sqlscheme://host:1234",
+            name="PUBLIC.popular_orders_day_of_week",
+            facets={
+                "schema": SchemaDatasetFacet(
+                    fields=[
+                        SchemaField(name="order_day_of_week", type="varchar"),
+                        SchemaField(name="order_placed_on", type="timestamp"),
+                        SchemaField(name="orders_placed", type="int4"),
+                    ]
+                )
+            },
+        )
+    ]
+
+    assert lineage.job_facets == {"sql": SqlJobFacet(query=sql)}
+
+    assert lineage.run_facets["extractionError"].failedTasks == 1
+
+    dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [rows, []]
+
+    lineage_on_complete = op.get_openlineage_facets_on_complete(None)
+    assert (
+        OperatorLineage(
+            inputs=lineage.inputs,
+            outputs=lineage.outputs,
+            run_facets={**lineage.run_facets, **{"completed": True}},
+            job_facets=lineage.job_facets,
+        )
+        == lineage_on_complete
+    )
diff --git a/tests/providers/openlineage/utils/__init__.py b/tests/providers/openlineage/utils/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/openlineage/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/openlineage/utils/test_sql.py b/tests/providers/openlineage/utils/test_sql.py
new file mode 100644
index 0000000000..be929a1ad6
--- /dev/null
+++ b/tests/providers/openlineage/utils/test_sql.py
@@ -0,0 +1,339 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+from openlineage.client.facet import SchemaDatasetFacet, SchemaField, set_producer
+from openlineage.client.run import Dataset
+from openlineage.common.models import DbColumn, DbTableSchema
+from openlineage.common.sql import DbTableMeta
+from sqlalchemy import Column, MetaData, Table
+
+from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION
+from airflow.providers.openlineage.utils.sql import (
+    create_filter_clauses,
+    create_information_schema_query,
+    get_table_schemas,
+)
+
+_PRODUCER = f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}"
+set_producer(_PRODUCER)
+
+DB_NAME = "FOOD_DELIVERY"
+DB_SCHEMA_NAME = "PUBLIC"
+DB_TABLE_NAME = DbTableMeta("DISCOUNTS")
+DB_TABLE_COLUMNS = [
+    DbColumn(name="ID", type="int4", ordinal_position=1),
+    DbColumn(name="AMOUNT_OFF", type="int4", ordinal_position=2),
+    DbColumn(name="CUSTOMER_EMAIL", type="varchar", ordinal_position=3),
+    DbColumn(name="STARTS_ON", type="timestamp", ordinal_position=4),
+    DbColumn(name="ENDS_ON", type="timestamp", ordinal_position=5),
+]
+DB_TABLE_SCHEMA = DbTableSchema(
+    schema_name=DB_SCHEMA_NAME, table_name=DB_TABLE_NAME, columns=DB_TABLE_COLUMNS
+)
+
+SCHEMA_FACET = SchemaDatasetFacet(
+    fields=[
+        SchemaField(name="ID", type="int4"),
+        SchemaField(name="AMOUNT_OFF", type="int4"),
+        SchemaField(name="CUSTOMER_EMAIL", type="varchar"),
+        SchemaField(name="STARTS_ON", type="timestamp"),
+        SchemaField(name="ENDS_ON", type="timestamp"),
+    ]
+)
+
+
+def test_get_table_schemas():
+    hook = MagicMock()
+    # (2) Mock calls to database
+    rows = [
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ID", 1, "int4"),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "AMOUNT_OFF", 2, "int4"),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "CUSTOMER_EMAIL", 3, "varchar"),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "STARTS_ON", 4, "timestamp"),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ENDS_ON", 5, "timestamp"),
+    ]
+
+    hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [rows, rows]
+
+    table_schemas = get_table_schemas(
+        hook=hook,
+        namespace="bigquery",
+        database=DB_NAME,
+        in_query="fake_sql",
+        out_query="another_fake_sql",
+    )
+
+    assert table_schemas == (
+        [
+            Dataset(
+                namespace="bigquery", name="FOOD_DELIVERY.PUBLIC.DISCOUNTS", facets={"schema": SCHEMA_FACET}
+            )
+        ],
+        [
+            Dataset(
+                namespace="bigquery", name="FOOD_DELIVERY.PUBLIC.DISCOUNTS", facets={"schema": SCHEMA_FACET}
+            )
+        ],
+    )
+
+
+def test_get_table_schemas_with_mixed_databases():
+    hook = MagicMock()
+    ANOTHER_DB_NAME = "ANOTHER_DB"
+
+    rows = [
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ID", 1, "int4", DB_NAME),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "AMOUNT_OFF", 2, "int4", DB_NAME),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "CUSTOMER_EMAIL", 3, "varchar", DB_NAME),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "STARTS_ON", 4, "timestamp", DB_NAME),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ENDS_ON", 5, "timestamp", DB_NAME),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ID", 1, "int4", ANOTHER_DB_NAME),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "AMOUNT_OFF", 2, "int4", ANOTHER_DB_NAME),
+        (
+            DB_SCHEMA_NAME,
+            DB_TABLE_NAME.name,
+            "CUSTOMER_EMAIL",
+            3,
+            "varchar",
+            ANOTHER_DB_NAME,
+        ),
+        (
+            DB_SCHEMA_NAME,
+            DB_TABLE_NAME.name,
+            "STARTS_ON",
+            4,
+            "timestamp",
+            ANOTHER_DB_NAME,
+        ),
+        (
+            DB_SCHEMA_NAME,
+            DB_TABLE_NAME.name,
+            "ENDS_ON",
+            5,
+            "timestamp",
+            ANOTHER_DB_NAME,
+        ),
+    ]
+
+    hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [rows, []]
+
+    table_schemas = get_table_schemas(
+        hook=hook,
+        namespace="bigquery",
+        database=DB_NAME,
+        in_query="fake_sql",
+        out_query="another_fake_sql",
+    )
+
+    assert table_schemas == (
+        [
+            Dataset(
+                namespace="bigquery", name="FOOD_DELIVERY.PUBLIC.DISCOUNTS", facets={"schema": SCHEMA_FACET}
+            ),
+            Dataset(
+                namespace="bigquery", name="ANOTHER_DB.PUBLIC.DISCOUNTS", facets={"schema": SCHEMA_FACET}
+            ),
+        ],
+        [],
+    )
+
+
+def test_get_table_schemas_with_mixed_schemas():
+    hook = MagicMock()
+    ANOTHER_DB_SCHEMA_NAME = "ANOTHER_DB_SCHEMA"
+
+    rows = [
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ID", 1, "int4"),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "AMOUNT_OFF", 2, "int4"),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "CUSTOMER_EMAIL", 3, "varchar"),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "STARTS_ON", 4, "timestamp"),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ENDS_ON", 5, "timestamp"),
+        (ANOTHER_DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ID", 1, "int4"),
+        (ANOTHER_DB_SCHEMA_NAME, DB_TABLE_NAME.name, "AMOUNT_OFF", 2, "int4"),
+        (ANOTHER_DB_SCHEMA_NAME, DB_TABLE_NAME.name, "CUSTOMER_EMAIL", 3, "varchar"),
+        (ANOTHER_DB_SCHEMA_NAME, DB_TABLE_NAME.name, "STARTS_ON", 4, "timestamp"),
+        (ANOTHER_DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ENDS_ON", 5, "timestamp"),
+    ]
+
+    hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [rows, []]
+
+    table_schemas = get_table_schemas(
+        hook=hook,
+        namespace="bigquery",
+        database=DB_NAME,
+        in_query="fake_sql",
+        out_query="another_fake_sql",
+    )
+
+    assert table_schemas == (
+        [
+            Dataset(
+                namespace="bigquery", name="FOOD_DELIVERY.PUBLIC.DISCOUNTS", facets={"schema": SCHEMA_FACET}
+            ),
+            Dataset(
+                namespace="bigquery",
+                name="FOOD_DELIVERY.ANOTHER_DB_SCHEMA.DISCOUNTS",
+                facets={"schema": SCHEMA_FACET},
+            ),
+        ],
+        [],
+    )
+
+
+def test_get_table_schemas_with_other_database():
+    hook = MagicMock()
+    ANOTHER_DB_NAME = "ANOTHER_DB"
+
+    rows = [
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "ID", 1, "int4", ANOTHER_DB_NAME),
+        (DB_SCHEMA_NAME, DB_TABLE_NAME.name, "AMOUNT_OFF", 2, "int4", ANOTHER_DB_NAME),
+        (
+            DB_SCHEMA_NAME,
+            DB_TABLE_NAME.name,
+            "CUSTOMER_EMAIL",
+            3,
+            "varchar",
+            ANOTHER_DB_NAME,
+        ),
+        (
+            DB_SCHEMA_NAME,
+            DB_TABLE_NAME.name,
+            "STARTS_ON",
+            4,
+            "timestamp",
+            ANOTHER_DB_NAME,
+        ),
+        (
+            DB_SCHEMA_NAME,
+            DB_TABLE_NAME.name,
+            "ENDS_ON",
+            5,
+            "timestamp",
+            ANOTHER_DB_NAME,
+        ),
+    ]
+
+    hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [rows, rows]
+
+    table_schemas = get_table_schemas(
+        hook=hook,
+        namespace="bigquery",
+        database=DB_NAME,
+        in_query="fake_sql",
+        out_query="another_fake_sql",
+    )
+
+    assert table_schemas == (
+        [
+            Dataset(
+                namespace="bigquery", name="ANOTHER_DB.PUBLIC.DISCOUNTS", facets={"schema": SCHEMA_FACET}
+            ),
+        ],
+        [
+            Dataset(
+                namespace="bigquery", name="ANOTHER_DB.PUBLIC.DISCOUNTS", facets={"schema": SCHEMA_FACET}
+            ),
+        ],
+    )
+
+
+@pytest.mark.parametrize(
+    "schema_mapping, expected",
+    [
+        pytest.param({None: ["C1", "C2"]}, ["information_schema.columns.table_name IN ('C1', 'C2')"]),
+        pytest.param(
+            {"Schema1": ["Table1"], "Schema2": ["Table2"]},
+            [
+                "information_schema.columns.table_schema = 'Schema1' AND "
+                "information_schema.columns.table_name IN ('Table1')",
+                "information_schema.columns.table_schema = 'Schema2' AND "
+                "information_schema.columns.table_name IN ('Table2')",
+            ],
+        ),
+        pytest.param(
+            {"Schema1": ["Table1", "Table2"]},
+            [
+                "information_schema.columns.table_schema = 'Schema1' AND "
+                "information_schema.columns.table_name IN ('Table1', 'Table2')",
+            ],
+        ),
+    ],
+)
+def test_create_filter_clauses(schema_mapping, expected):
+    information_table = Table(
+        "columns", MetaData(), *[Column("table_name"), Column("table_schema")], schema="information_schema"
+    )
+    clauses = create_filter_clauses(schema_mapping, information_table)
+    assert [str(clause.compile(compile_kwargs={"literal_binds": True})) for clause in clauses] == expected
+
+
+def test_create_create_information_schema_query():
+    columns = [
+        "table_schema",
+        "table_name",
+        "column_name",
+        "ordinal_position",
+        "udt_name",
+    ]
+    assert (
+        create_information_schema_query(
+            columns=columns,
+            information_schema_table_name="information_schema.columns",
+            tables_hierarchy={None: {"schema1": ["table1"]}},
+        )
+        == "SELECT information_schema.columns.table_schema, "
+        "information_schema.columns.table_name, information_schema.columns.column_name, "
+        "information_schema.columns.ordinal_position, information_schema.columns.udt_name \n"
+        "FROM information_schema.columns \n"
+        "WHERE information_schema.columns.table_schema = 'schema1' "
+        "AND information_schema.columns.table_name IN ('table1')"
+    )
+
+
+def test_create_create_information_schema_query_cross_db():
+    columns = [
+        "table_schema",
+        "table_name",
+        "column_name",
+        "ordinal_position",
+        "data_type",
+    ]
+
+    assert (
+        create_information_schema_query(
+            columns=columns,
+            information_schema_table_name="information_schema.columns",
+            tables_hierarchy={"db": {"schema1": ["table1"]}, "db2": {"schema1": ["table2"]}},
+        )
+        == 'SELECT "db.information_schema".columns.table_schema, "db.information_schema".columns.table_name, '
+        '"db.information_schema".columns.column_name, "db.information_schema".columns.ordinal_position, '
+        '"db.information_schema".columns.data_type \n'
+        'FROM "db.information_schema".columns \n'
+        "WHERE \"db.information_schema\".columns.table_schema = 'schema1' "
+        "AND \"db.information_schema\".columns.table_name IN ('table1') "
+        "UNION ALL "
+        'SELECT "db2.information_schema".columns.table_schema, "db2.information_schema".columns.table_name, '
+        '"db2.information_schema".columns.column_name, "db2.information_schema".columns.ordinal_position, '
+        '"db2.information_schema".columns.data_type \n'
+        'FROM "db2.information_schema".columns \n'
+        "WHERE \"db2.information_schema\".columns.table_schema = 'schema1' "
+        "AND \"db2.information_schema\".columns.table_name IN ('table2')"
+    )
diff --git a/tests/providers/openlineage/utils/test_sqlparser.py b/tests/providers/openlineage/utils/test_sqlparser.py
new file mode 100644
index 0000000000..6f11a7ad94
--- /dev/null
+++ b/tests/providers/openlineage/utils/test_sqlparser.py
@@ -0,0 +1,222 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+from unittest.mock import MagicMock
+
+from openlineage.client.facet import SchemaDatasetFacet, SchemaField, SqlJobFacet
+from openlineage.client.run import Dataset
+from openlineage.common.sql import DbTableMeta
+
+from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.providers.openlineage.sqlparser import DatabaseInfo, SQLParser
+
+DB_NAME = "FOOD_DELIVERY"
+DB_SCHEMA_NAME = "PUBLIC"
+DB_TABLE_NAME = DbTableMeta("DISCOUNTS")
+
+NAMESPACE = "test_namespace"
+
+SCHEMA_FACET = SchemaDatasetFacet(
+    fields=[
+        SchemaField(name="ID", type="int4"),
+        SchemaField(name="AMOUNT_OFF", type="int4"),
+        SchemaField(name="CUSTOMER_EMAIL", type="varchar"),
+        SchemaField(name="STARTS_ON", type="timestamp"),
+        SchemaField(name="ENDS_ON", type="timestamp"),
+    ]
+)
+
+
+def normalize_name_lower(name: str) -> str:
+    return name.lower()
+
+
+class TestSQLParser:
+    def test_get_tables_hierarchy(self):
+        assert SQLParser._get_tables_hierarchy(
+            [DbTableMeta("Table1"), DbTableMeta("Table2")], normalize_name_lower
+        ) == {None: {None: ["Table1", "Table2"]}}
+
+        # base check with db, no cross db
+        assert SQLParser._get_tables_hierarchy(
+            [DbTableMeta("Db.Schema1.Table1"), DbTableMeta("Db.Schema2.Table2")], normalize_name_lower
+        ) == {None: {"schema1": ["Table1"], "schema2": ["Table2"]}}
+
+        # same, with cross db
+        assert SQLParser._get_tables_hierarchy(
+            [DbTableMeta("Db.Schema1.Table1"), DbTableMeta("Db.Schema2.Table2")],
+            normalize_name_lower,
+            is_cross_db=True,
+        ) == {"db": {"schema1": ["Table1"], "schema2": ["Table2"]}}
+
+        # explicit db, no cross db
+        assert SQLParser._get_tables_hierarchy(
+            [DbTableMeta("Schema1.Table1"), DbTableMeta("Schema1.Table2")],
+            normalize_name_lower,
+            database="Db",
+        ) == {None: {"schema1": ["Table1", "Table2"]}}
+
+        # explicit db, with cross db
+        assert SQLParser._get_tables_hierarchy(
+            [DbTableMeta("Schema1.Table1"), DbTableMeta("Schema1.Table2")],
+            normalize_name_lower,
+            database="Db",
+            is_cross_db=True,
+        ) == {"db": {"schema1": ["Table1", "Table2"]}}
+
+        # mixed db, with cross db
+        assert SQLParser._get_tables_hierarchy(
+            [DbTableMeta("Db2.Schema1.Table1"), DbTableMeta("Schema1.Table2")],
+            normalize_name_lower,
+            database="Db",
+            is_cross_db=True,
+        ) == {"db": {"schema1": ["Table2"]}, "db2": {"schema1": ["Table1"]}}
+
+    def test_normalize_sql(self):
+        assert SQLParser.normalize_sql("select * from asdf") == "select * from asdf"
+
+        assert (
+            SQLParser.normalize_sql(["select * from asdf", "insert into asdf values (1,2,3)"])
+            == "select * from asdf;\ninsert into asdf values (1,2,3)"
+        )
+
+        assert (
+            SQLParser.normalize_sql("select * from asdf;insert into asdf values (1,2,3)")
+            == "select * from asdf;\ninsert into asdf values (1,2,3)"
+        )
+
+        assert (
+            SQLParser.normalize_sql(
+                """CREATE FUNCTION somefunc() RETURNS integer AS $$
+                BEGIN
+                    ...
+                END;
+                $$ LANGUAGE plpgsql```"""
+            )
+            == """CREATE FUNCTION somefunc() RETURNS integer AS $$
+                BEGIN
+                    ...
+                END;
+                $$ LANGUAGE plpgsql```"""
+        )
+
+    def test_normalize_sql_with_no_common_sql_provider(self):
+        with mock.patch.dict("sys.modules", {"airflow.providers.common.sql.hooks.sql": None}):
+            assert (
+                SQLParser.normalize_sql("select * from asdf;insert into asdf values (1,2,3)")
+                == "select * from asdf;\ninsert into asdf values (1,2,3)"
+            )
+
+    def test_parse_table_schemas(self):
+        parser = SQLParser()
+        db_info = DatabaseInfo(scheme="myscheme")
+
+        hook = MagicMock()
+
+        rows = lambda name: [
+            (DB_SCHEMA_NAME, name, "ID", 1, "int4"),
+            (DB_SCHEMA_NAME, name, "AMOUNT_OFF", 2, "int4"),
+            (DB_SCHEMA_NAME, name, "CUSTOMER_EMAIL", 3, "varchar"),
+            (DB_SCHEMA_NAME, name, "STARTS_ON", 4, "timestamp"),
+            (DB_SCHEMA_NAME, name, "ENDS_ON", 5, "timestamp"),
+        ]
+
+        hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [
+            rows("TABLE_IN"),
+            rows("TABLE_OUT"),
+        ]
+
+        expected = (
+            [Dataset(namespace=NAMESPACE, name="PUBLIC.TABLE_IN", facets={"schema": SCHEMA_FACET})],
+            [Dataset(namespace=NAMESPACE, name="PUBLIC.TABLE_OUT", facets={"schema": SCHEMA_FACET})],
+        )
+
+        assert expected == parser.parse_table_schemas(
+            hook=hook,
+            namespace=NAMESPACE,
+            inputs=[DbTableMeta("TABLE_IN")],
+            outputs=[DbTableMeta("TABLE_OUT")],
+            database_info=db_info,
+        )
+
+    @mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.parse")
+    def test_generate_openlineage_metadata_from_sql(self, mock_parse):
+        parser = SQLParser()
+        db_info = DatabaseInfo(scheme="myscheme", authority="host:port")
+
+        hook = MagicMock()
+
+        rows = lambda name: [
+            (DB_SCHEMA_NAME, name, "ID", 1, "int4"),
+            (DB_SCHEMA_NAME, name, "AMOUNT_OFF", 2, "int4"),
+            (DB_SCHEMA_NAME, name, "CUSTOMER_EMAIL", 3, "varchar"),
+            (DB_SCHEMA_NAME, name, "STARTS_ON", 4, "timestamp"),
+            (DB_SCHEMA_NAME, name, "ENDS_ON", 5, "timestamp"),
+        ]
+
+        sql = """CREATE TABLE table_out (
+            ID int,
+            AMOUNT_OFF int,
+            CUSTOMER_EMAIL varchar,
+            STARTS_ON timestamp,
+            ENDS_ON timestamp
+            --irrelevant comment
+        )
+        ;
+        """
+
+        hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [
+            rows("TABLE_IN"),
+            rows("TABLE_OUT"),
+        ]
+
+        mock_sql_meta = MagicMock()
+        mock_sql_meta.in_tables = [DbTableMeta("PUBLIC.TABLE_IN")]
+        mock_sql_meta.out_tables = [DbTableMeta("PUBLIC.TABLE_OUT")]
+        mock_sql_meta.errors = []
+
+        mock_parse.return_value = mock_sql_meta
+
+        formatted_sql = """CREATE TABLE table_out (
+            ID int,
+            AMOUNT_OFF int,
+            CUSTOMER_EMAIL varchar,
+            STARTS_ON timestamp,
+            ENDS_ON timestamp
+
+)"""
+        expected = OperatorLineage(
+            inputs=[
+                Dataset(
+                    namespace="myscheme://host:port", name="PUBLIC.TABLE_IN", facets={"schema": SCHEMA_FACET}
+                )
+            ],
+            outputs=[
+                Dataset(
+                    namespace="myscheme://host:port", name="PUBLIC.TABLE_OUT", facets={"schema": SCHEMA_FACET}
+                )
+            ],
+            job_facets={"sql": SqlJobFacet(query=formatted_sql)},
+        )
+
+        assert expected == parser.generate_openlineage_metadata_from_sql(
+            sql=sql,
+            hook=hook,
+            database_info=db_info,
+        )