You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/28 22:22:58 UTC

[GitHub] [airflow] malthe opened a new pull request #15581: Add bindvars Xcom result to Oracle operator

malthe opened a new pull request #15581:
URL: https://github.com/apache/airflow/pull/15581


   This is useful when executing a statement where out variables are required – for example a stored procedure.
   
   Where an out parameter is required, pass a Python value of the required type (for example `0` for an integer, or "dummy" for a string).
   
   Note that this scheme is supported by the Oracle driver itself. In the future, a more elaborate scheme can be implemented which could allow the use of https://cx-oracle.readthedocs.io/en/latest/api_manual/cursor.html#Cursor.var.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r632301007



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -62,4 +68,30 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        kwargs = {
+            "autocommit": self.autocommit,
+            "parameters": self.parameters,
+        }
+
+        # For backwards compatibility, if the hook implementation does not

Review comment:
       Cool. But... We do not need backwards compatibily any more :). We've just made a decision that the next wave of providers will be Airflow 2.1+ compatible and we are *just about* to release 2.1 (this was due to removal of @apply_defaults - see #15667 ). So feel free to rebase and remove the last commit and I am happy to merge it before 2.1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r632301007



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -62,4 +68,30 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        kwargs = {
+            "autocommit": self.autocommit,
+            "parameters": self.parameters,
+        }
+
+        # For backwards compatibility, if the hook implementation does not

Review comment:
       Cool. But... We do not need backwards compatibily any more :). We've just made a decision that the next wave of providers will be Airflow 2.1 compatible and we are *just about* to release 2.1 (this was due to removal of @apply_defaults - see #15667 ). So feel free to rebase and remove the last commit and I am happy to merge it before 2.1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
malthe commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842368183


   @ashb wait so it seemed like you already fixed most or all of the remaining issues?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842409709


   > @ashb wait so it seemed like you already fixed most or all of the remaining issues?
   
   @malthe Yeah, correct, I think so.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842238190


   Copenhagen :D. So there is a chance ;).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624657163



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       The problem with that is that we are releasing providers from "master"  so if we make a "2.1" change, it will have to wait until to 2.1 release to be merged. 
   
   IMHO much better solution is  implementing it in a backwards-compatible version,  merging it now and releasing new provider that will simply use the functionality when Airflow 2.1 is released.

##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       The problem with that is that we are releasing providers from "master"  so if we make a "2.1" change, it will have to wait until to 2.1 release to be merged. 
   
   IMHO much better solution is  implementing it in a backwards-compatible way,  merging it now and releasing new provider that will simply use the functionality when Airflow 2.1 is released.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842212484


   @malthe - there are some static checks failing about trailing whitespace - can you please fix ? I hope it's going to be the last one!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
malthe commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841856594






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r633368587



##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,51 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :param handler: The result handler which is called after each statement.
+        :type handler: Callable which gets a cursor object.

Review comment:
       ```suggestion
           :type handler: callable
   ```
   
   Type should be a bare type, not a description




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #15581:
URL: https://github.com/apache/airflow/pull/15581


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841084104






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
malthe commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-844373991


   @MatrixManAtYrService with the new [Taskflow API](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html), coding up a custom task using the hooks directly seems like a more simple approach than trying to add a closure to the operator.
   
   I had actually started out with an implementation that made the operator smarter about handling a query result but realized halfway that it's not really that helpful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842171785


   > > Some unit tests covering passing a handler in would have been _nice_, but given that path is simple we can merge this anyway to get in in 2.1
   > 
   > @ashb there is a rather rudimentary unit test added `test_run_with_handler` but I can definitely elaborate on it.
   
   Oh no that's good. I missed it, sorry!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #15581:
URL: https://github.com/apache/airflow/pull/15581


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r633369719



##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,51 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :param handler: The result handler which is called after each statement.

Review comment:
       ```suggestion
           :param handler: The result handler which is called with the result of each statement. 
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842464372


   Two random failures. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624493306



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       There is a subtle problem with this approach. This would mean that the Oracle Provider will be only compatible with the new DBApi/DBHook implementation, which is part of the Airflow itself. This means that either we make a breaking change (and add > 2.1 limitation to the Oracle provider, or (better) we implement it in backward-compatible way - honoring the fact that the run method might have no "handler" added.
   
   That would mean that bind-variable functionality will only be available in Airflow 2.1 and above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841198845






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624716665



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       @potiuk good point – fixed now in b81bf4d6de52d4528e58324722a2118235570da0.
   
   Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624657163



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       The problem with that is that we are releasing providers from "master"  so if we make a "2.1" change, it will have to wait until  2.1 release to be merged (and will become available only in the next release of Oracle provider). 
   
   IMHO much better solution is  implementing it in a backwards-compatible way,  merging it now and releasing new provider that will simply use the functionality when Airflow 2.1 is released.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r633527690



##########
File path: tests/providers/oracle/operators/test_oracle.py
##########
@@ -40,5 +40,7 @@ def test_execute(self, mock_run):
             task_id=task_id,
         )
         operator.execute(context=context)
-
-        mock_run.assert_called_once_with(sql, autocommit=autocommit, parameters=parameters)
+        assert len(mock_run.mock_calls) == 1
+        assert mock_run.mock_calls[0].args[1] == sql

Review comment:
       Oh, `.args` doesn't exist on Py3.6




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842231150


   @malthe - plus the unit tests are failing with int  comparision on Mocks :)  https://github.com/apache/airflow/pull/15581/checks?check_run_id=2599840179#step:6:12848


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624424105



##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,46 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :return: command result.
         """
-        if isinstance(sql, str):
+
+        scalar = isinstance(sql, str)
+        if scalar:
             sql = [sql]
 
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, autocommit)
 
             with closing(conn.cursor()) as cur:
+                results = []
                 for sql_statement in sql:
-
-                    self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
-                    if parameters:
-                        cur.execute(sql_statement, parameters)
-                    else:
-                        cur.execute(sql_statement)
-                    if hasattr(cur, 'rowcount'):
-                        self.log.info("Rows affected: %s", cur.rowcount)
+                    result = self._run_command(cur, sql_statement, parameters)
+                    results.append(result)
 
             # If autocommit was set to False for db that supports autocommit,
             # or if db does not supports autocommit, we do a manual commit.
             if not self.get_autocommit(conn):
                 conn.commit()
 
+        if scalar:
+            return results[0]
+
+        return results
+
+    def _run_command(self, cur, sql_statement, parameters):
+        """
+        Runs a statement using an already open cursor.
+        """
+
+        self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
+        if parameters:
+            cur.execute(sql_statement, parameters)
+        else:
+            cur.execute(sql_statement)
+        if hasattr(cur, 'rowcount'):
+            self.log.info("Rows affected: %s", cur.rowcount)

Review comment:
       I know this is carried over from the previous implementation, but the `hasattr` call looks unnecessary (and wrong?) to me. According to PEP 249, `rowcount` is always set, and set to `-1` when not applicable, so this does not seem to do what’s actually intended.

##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,46 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :return: command result.
         """
-        if isinstance(sql, str):
+
+        scalar = isinstance(sql, str)
+        if scalar:
             sql = [sql]
 
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, autocommit)
 
             with closing(conn.cursor()) as cur:
+                results = []
                 for sql_statement in sql:
-
-                    self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
-                    if parameters:
-                        cur.execute(sql_statement, parameters)
-                    else:
-                        cur.execute(sql_statement)
-                    if hasattr(cur, 'rowcount'):
-                        self.log.info("Rows affected: %s", cur.rowcount)
+                    result = self._run_command(cur, sql_statement, parameters)
+                    results.append(result)

Review comment:
       ```python
   results = [self._run_command(cur, sql_statement, parameters) for sql_statement in sql]
   ```

##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,46 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :return: command result.
         """
-        if isinstance(sql, str):
+
+        scalar = isinstance(sql, str)
+        if scalar:
             sql = [sql]
 
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, autocommit)
 
             with closing(conn.cursor()) as cur:
+                results = []
                 for sql_statement in sql:
-
-                    self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
-                    if parameters:
-                        cur.execute(sql_statement, parameters)
-                    else:
-                        cur.execute(sql_statement)
-                    if hasattr(cur, 'rowcount'):
-                        self.log.info("Rows affected: %s", cur.rowcount)
+                    result = self._run_command(cur, sql_statement, parameters)
+                    results.append(result)
 
             # If autocommit was set to False for db that supports autocommit,
             # or if db does not supports autocommit, we do a manual commit.
             if not self.get_autocommit(conn):
                 conn.commit()
 
+        if scalar:
+            return results[0]
+
+        return results
+
+    def _run_command(self, cur, sql_statement, parameters):
+        """
+        Runs a statement using an already open cursor.
+        """
+
+        self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
+        if parameters:
+            cur.execute(sql_statement, parameters)
+        else:
+            cur.execute(sql_statement)
+        if hasattr(cur, 'rowcount'):
+            self.log.info("Rows affected: %s", cur.rowcount)

Review comment:
       Also, maybe `_run_command()` should always have a `return`? It feels weird to me only `OracleHook` returns, but other hooks are returning `None` implicitly. `return cur.fetchall()` feels like a reasonable default to me, although even explicitly returning `None` would be better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841862790


   [The Workflow run](https://github.com/apache/airflow/actions/runs/847505755) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624621992



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       @potiuk to some extent I feel that this is a big enough change either way that perhaps a 2.1 target is not so bad – in how it'll actually fetch all rows now and – given that `do_push_xcom` is set by default – pushed to Xcom.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624657163



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       The problem with that is that we are releasing providers from "master"  so if we make a "2.1" change, it will have to wait until  2.1 release to be merged. 
   
   IMHO much better solution is  implementing it in a backwards-compatible way,  merging it now and releasing new provider that will simply use the functionality when Airflow 2.1 is released.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841033316






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
malthe commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841625268


   @potiuk I decided to remove the more opinionated part of this PR – I think it's better to start with a more basic approach and then see if it's useful to add more functionality to the Operator-level later on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841623626


   [The Workflow run](https://github.com/apache/airflow/actions/runs/844490876) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MatrixManAtYrService commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
MatrixManAtYrService commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-843763802


   I got this working, it looked like this:
   
   ```python3
   def handler(cur):
       hello, world = cur.fetchone()
       message = f"{hello} {world}!"
       return message
   
   @task
   def mytask():
       hook = SqliteHook(sqlite_conn_id="my_conn_id")
       message = hook.run(my_sql, handler=handler)
       print(message)
   ```
   
   As it stands, I don't think this works:
   ```python3
   
   SqliteOperator(
       task_id="some_id",
       sql=my_sql,
       sqlite_conn_id="sqlite_conn_id",
       handler=handler #<--- no such kwarg
   )
   ```
   
   Do we want to add the handler kwarg to the operators too?  Not that the return value would be helpful there, but users could call `get_current_context` in the handler and push the outputs to xcom or something like that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
malthe commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842171095


   > Some unit tests covering passing a handler in would have been _nice_, but given that path is simple we can merge this anyway to get in in 2.1
   
   @ashb there is a rather rudimentary unit test added `test_run_with_handler` but I can definitely elaborate on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842260898


   I just pushed a fix to your PR -- fixed the style, moved test from oracle to bash db hook, and tested scalar and list types.
   
   Oh and added Changelog as we've already merged it (not normally something you have to worry about) :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841198493


   [The Workflow run](https://github.com/apache/airflow/actions/runs/841860489) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841084346


   [The Workflow run](https://github.com/apache/airflow/actions/runs/841506705) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842348663


   [The Workflow run](https://github.com/apache/airflow/actions/runs/849899349) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624493306



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       There is a subtle problem with this approach. This would mean that the Oracle Provider will be only compatible with the new DBApi/DBHook implementation, which is part of the Airflow itself. This means that either we make a breaking change (and add > 2.1 limitation to the Oracle provider), or (better) we implement it in backward-compatible way - honoring the fact that the run method might have no "handler" added.
   
   That would mean that bind-variable functionality will only be available in Airflow 2.1 and above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842231977


   But the CI seems to finally be alive & kicking. Seems like GitHuib had some big troubles with Actions that culminated in weekend outtage and seems to be fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-828820253


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842237876


   And @malthe -> not sure what timezone you are in, but we would love to release 2.1 rc1 tonight and get that change in so that when we release next wave of providers with >= 2.1 we could already use this feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r633368587



##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,51 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :param handler: The result handler which is called after each statement.
+        :type handler: Callable which gets a cursor object.

Review comment:
       ```suggestion
           :type handler: callable
   ```
   
   Type should be a bare type, not a description

##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,51 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :param handler: The result handler which is called after each statement.

Review comment:
       ```suggestion
           :param handler: The result handler which is called with the result of each statement. 
   ```

##########
File path: tests/providers/oracle/operators/test_oracle.py
##########
@@ -40,5 +40,7 @@ def test_execute(self, mock_run):
             task_id=task_id,
         )
         operator.execute(context=context)
-
-        mock_run.assert_called_once_with(sql, autocommit=autocommit, parameters=parameters)
+        assert len(mock_run.mock_calls) == 1
+        assert mock_run.mock_calls[0].args[1] == sql

Review comment:
       Oh, `.args` doesn't exist on Py3.6




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #15581:
URL: https://github.com/apache/airflow/pull/15581


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624499520



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       And I have to think about automating such checks in pre-commit (mental note to self).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842475870


   Cool! Thanks @ashb !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-844146643


   @MatrixManAtYrService Yeah, as it stands now this hasn't been exposed to any of the Operators -- it was mostly designed as an extra tool in the db hook for _writing_ operator features.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
malthe commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841856594


   @potiuk title changed and I have squashed commits into a single one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624493306



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       There is a subtle problem with this approach. This would mean that the Oracle Provider will be only compatible with the new DBApi implementation, which is part of the Airflow itself. This means that either we make a breaking change (and add > 2.1 limitation to the Oracle provider, or (better) we implement it in backward-compatible way - honoring the fact that the run method might have no "handler" added.
   
   That would mean that bind-variable functionality will only be available in Airflow 2.1 and above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841023669


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841807585


   Hey @malthe - isn't that change now something different :) ? 
   
   From what I see - what's left is **Just** adding handlers to the DB hooks. Which I think is very good and we should merge it before 2.1 so that the next wave of providers which will be 2.1+ only make use of it ? 
   
   Would you mind changing the commit title/message? We have a major outage of GitHub Actions now, I hope it will be solved soon and finally we will be able to merge it: https://www.githubstatus.com/incidents/zbpwygxwb3gw


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841862790






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #15581:
URL: https://github.com/apache/airflow/pull/15581


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841033532


   [The Workflow run](https://github.com/apache/airflow/actions/runs/841250314) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842171785






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r629657710



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -62,4 +68,30 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        kwargs = {
+            "autocommit": self.autocommit,
+            "parameters": self.parameters,
+        }
+
+        # For backwards compatibility, if the hook implementation does not

Review comment:
       @potiuk this is how I ended up providing compatibility with the older hook interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb edited a comment on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-844146643


   @MatrixManAtYrService Yeah, as it stands now this hasn't been exposed to any of the Operators -- it was mostly designed as an extra tool in the db hook for _writing_ operator features.
   
   > Do we want to add the handler kwarg to the operators too? Not that the return value would be helpful there, but users could call get_current_context in the handler and push the outputs to xcom or something like that.
   
   If it's written as part of the operator, then you can make a closure to get the context vars you need.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
malthe commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-829203997


   The following is an example of an alternative implementation that works with the existing codebase, specifically addressing the requirement of calling a stored procedure that has one or more _out parameters_.
   
   Simply use a dummy parameter with the right Python type for each _out parameter_ (e.g. the integer `0` for a number) and the entire set of parameters (both in/out directions) will be pushed as Xcom (if enabled – which is the default setting).
   
   ```python
   from airflow.models import BaseOperator
   from airflow.providers.oracle.hooks.oracle import OracleHook
   from airflow.utils.decorators import apply_defaults
   
   class OracleCallOperator(BaseOperator):
       ui_color = '#ededed'
   
       @apply_defaults
       def __init__(
           self,
           *,
           function: str,
           oracle_conn_id: str = 'oracle_default',
           parameters: Optional[Union[Mapping, Iterable]] = None,
           autocommit: bool = False,
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.oracle_conn_id = oracle_conn_id
           self.function = function
           self.autocommit = autocommit
           self.parameters = parameters
   
       def execute(self, context) -> None:
           self.log.info('Executing: %s', self.function)
           hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
   
           args = ", ".join(
               f":{name}" for name in (
                   self.parameters if isinstance(self.parameters, dict) 
                   else range(1, len(self.parameters) + 1)
               )
           )
   
           sql = f"BEGIN {self.function}({args}); END;"
   
           with hook.get_conn() as conn, conn.cursor() as cursor:
               cursor.execute(sql, self.parameters)
               if isinstance(cursor.bindvars, list):
                   return [v.getvalue() for v in cursor.bindvars]
               if isinstance(cursor.bindvars, dict):
                   return {n: v.getvalue() for (n, v) in cursor.bindvars.items()}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624499431



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       We could do in one of the ways described here: https://stackoverflow.com/questions/11915032/get-keyword-arguments-for-function-python




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624657163



##########
File path: airflow/providers/oracle/operators/oracle.py
##########
@@ -63,4 +68,25 @@ def __init__(
     def execute(self, context) -> None:
         self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
-        hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+        def handler(cur):
+            bindvars = cur.bindvars
+
+            if isinstance(bindvars, list):
+                bindvars = [v.getvalue() for v in bindvars]
+            elif isinstance(bindvars, dict):
+                bindvars = {n: v.getvalue() for (n, v) in bindvars.items()}
+            else:
+                raise TypeError(bindvars)
+
+            return {
+                "bindvars": bindvars,
+                "rows": cur.fetchall(),
+            }
+
+        return hook.run(

Review comment:
       The problem with that is that we are releasing providers from "master"  so if we make a "2.1" change, it will have to wait until to 2.1 release to be merged. 
   
   IMHO much better solution i  implementing it in a backwards-compatible version,  merging it now and releasing new provider that will simply use the functionality when Airflow 2.1 is released.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15581: Add optional result handler to database hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-842148521


   Closing/reopening as the CI seems more stable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841807585






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-841084629


   [The Workflow run](https://github.com/apache/airflow/actions/runs/841508404) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624472337



##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,46 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :return: command result.
         """
-        if isinstance(sql, str):
+
+        scalar = isinstance(sql, str)
+        if scalar:
             sql = [sql]
 
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, autocommit)
 
             with closing(conn.cursor()) as cur:
+                results = []
                 for sql_statement in sql:
-
-                    self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
-                    if parameters:
-                        cur.execute(sql_statement, parameters)
-                    else:
-                        cur.execute(sql_statement)
-                    if hasattr(cur, 'rowcount'):
-                        self.log.info("Rows affected: %s", cur.rowcount)
+                    result = self._run_command(cur, sql_statement, parameters)
+                    results.append(result)
 
             # If autocommit was set to False for db that supports autocommit,
             # or if db does not supports autocommit, we do a manual commit.
             if not self.get_autocommit(conn):
                 conn.commit()
 
+        if scalar:
+            return results[0]
+
+        return results
+
+    def _run_command(self, cur, sql_statement, parameters):
+        """
+        Runs a statement using an already open cursor.
+        """
+
+        self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
+        if parameters:
+            cur.execute(sql_statement, parameters)
+        else:
+            cur.execute(sql_statement)
+        if hasattr(cur, 'rowcount'):
+            self.log.info("Rows affected: %s", cur.rowcount)

Review comment:
       @uranusjr I have pushed changes such that the `run` method now receives an optional `handler` argument.
   
   When this argument is provided, results are gathered and returned. Right now it's only implemented for the Oracle operator but I think we should attempt to extend to other providers as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org