You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/01 06:04:46 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #15581: Add bindvars Xcom result to Oracle operator

uranusjr commented on a change in pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#discussion_r624424105



##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,46 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :return: command result.
         """
-        if isinstance(sql, str):
+
+        scalar = isinstance(sql, str)
+        if scalar:
             sql = [sql]
 
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, autocommit)
 
             with closing(conn.cursor()) as cur:
+                results = []
                 for sql_statement in sql:
-
-                    self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
-                    if parameters:
-                        cur.execute(sql_statement, parameters)
-                    else:
-                        cur.execute(sql_statement)
-                    if hasattr(cur, 'rowcount'):
-                        self.log.info("Rows affected: %s", cur.rowcount)
+                    result = self._run_command(cur, sql_statement, parameters)
+                    results.append(result)
 
             # If autocommit was set to False for db that supports autocommit,
             # or if db does not supports autocommit, we do a manual commit.
             if not self.get_autocommit(conn):
                 conn.commit()
 
+        if scalar:
+            return results[0]
+
+        return results
+
+    def _run_command(self, cur, sql_statement, parameters):
+        """
+        Runs a statement using an already open cursor.
+        """
+
+        self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
+        if parameters:
+            cur.execute(sql_statement, parameters)
+        else:
+            cur.execute(sql_statement)
+        if hasattr(cur, 'rowcount'):
+            self.log.info("Rows affected: %s", cur.rowcount)

Review comment:
       I know this is carried over from the previous implementation, but the `hasattr` call looks unnecessary (and wrong?) to me. According to PEP 249, `rowcount` is always set, and set to `-1` when not applicable, so this does not seem to do what’s actually intended.

##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,46 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :return: command result.
         """
-        if isinstance(sql, str):
+
+        scalar = isinstance(sql, str)
+        if scalar:
             sql = [sql]
 
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, autocommit)
 
             with closing(conn.cursor()) as cur:
+                results = []
                 for sql_statement in sql:
-
-                    self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
-                    if parameters:
-                        cur.execute(sql_statement, parameters)
-                    else:
-                        cur.execute(sql_statement)
-                    if hasattr(cur, 'rowcount'):
-                        self.log.info("Rows affected: %s", cur.rowcount)
+                    result = self._run_command(cur, sql_statement, parameters)
+                    results.append(result)

Review comment:
       ```python
   results = [self._run_command(cur, sql_statement, parameters) for sql_statement in sql]
   ```

##########
File path: airflow/hooks/dbapi.py
##########
@@ -166,30 +166,46 @@ def run(self, sql, autocommit=False, parameters=None):
         :type autocommit: bool
         :param parameters: The parameters to render the SQL query with.
         :type parameters: dict or iterable
+        :return: command result.
         """
-        if isinstance(sql, str):
+
+        scalar = isinstance(sql, str)
+        if scalar:
             sql = [sql]
 
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, autocommit)
 
             with closing(conn.cursor()) as cur:
+                results = []
                 for sql_statement in sql:
-
-                    self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
-                    if parameters:
-                        cur.execute(sql_statement, parameters)
-                    else:
-                        cur.execute(sql_statement)
-                    if hasattr(cur, 'rowcount'):
-                        self.log.info("Rows affected: %s", cur.rowcount)
+                    result = self._run_command(cur, sql_statement, parameters)
+                    results.append(result)
 
             # If autocommit was set to False for db that supports autocommit,
             # or if db does not supports autocommit, we do a manual commit.
             if not self.get_autocommit(conn):
                 conn.commit()
 
+        if scalar:
+            return results[0]
+
+        return results
+
+    def _run_command(self, cur, sql_statement, parameters):
+        """
+        Runs a statement using an already open cursor.
+        """
+
+        self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
+        if parameters:
+            cur.execute(sql_statement, parameters)
+        else:
+            cur.execute(sql_statement)
+        if hasattr(cur, 'rowcount'):
+            self.log.info("Rows affected: %s", cur.rowcount)

Review comment:
       Also, maybe `_run_command()` should always have a `return`? It feels weird to me only `OracleHook` returns, but other hooks are returning `None` implicitly. `return cur.fetchall()` feels like a reasonable default to me, although even explicitly returning `None` would be better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org