You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/01 09:15:22 UTC

[GitHub] [airflow] malthe commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

malthe commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624472337



##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,46 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :return: command result.
         """
-        if isinstance(sql, str):
+
+        scalar = isinstance(sql, str)
+        if scalar:
             sql = [sql]
 
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, autocommit)
 
             with closing(conn.cursor()) as cur:
+                results = []
                 for sql_statement in sql:
-
-                    self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
-                    if parameters:
-                        cur.execute(sql_statement, parameters)
-                    else:
-                        cur.execute(sql_statement)
-                    if hasattr(cur, 'rowcount'):
-                        self.log.info("Rows affected: %s", cur.rowcount)
+                    result = self._run_command(cur, sql_statement, parameters)
+                    results.append(result)
 
             # If autocommit was set to False for db that supports autocommit,
             # or if db does not supports autocommit, we do a manual commit.
             if not self.get_autocommit(conn):
                 conn.commit()
 
+        if scalar:
+            return results[0]
+
+        return results
+
+    def _run_command(self, cur, sql_statement, parameters):
+        """
+        Runs a statement using an already open cursor.
+        """
+
+        self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
+        if parameters:
+            cur.execute(sql_statement, parameters)
+        else:
+            cur.execute(sql_statement)
+        if hasattr(cur, 'rowcount'):
+            self.log.info("Rows affected: %s", cur.rowcount)

Review comment:
       @uranusjr I have pushed changes such that the `run` method now receives an optional `handler` argument.
   
   When this argument is provided, results are gathered and returned. Right now it's only implemented for the Oracle operator but I think we should attempt to extend to other providers as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org