You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/01 10:43:28 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #15128: Allow multiple statements executions and return info of executions SnowflakeHook

uranusjr commented on a change in pull request #15128:
URL: https://github.com/apache/airflow/pull/15128#discussion_r605556063



##########
File path: airflow/providers/snowflake/hooks/snowflake.py
##########
@@ -242,3 +246,31 @@ def set_autocommit(self, conn, autocommit: Any) -> None:
 
     def get_autocommit(self, conn):
         return getattr(conn, 'autocommit_mode', False)
+
+    def run(self, sql, autocommit=True, parameters=None):
+        """The snowflake python connector library doesn't allow to execute multiple statements from a single string or
+        file and doesn't return any information about the execution. This method will override DBApiHook run method to
+        enable this features using snowflake connector methods"""
+        if isinstance(sql, str):
+            sql = [item[0] for item in split_statements(io.StringIO(sql))]
+
+        with closing(self.get_conn()) as conn:
+            self.set_autocommit(conn, autocommit)
+
+            with closing(conn.cursor(DictCursor)) as cur:
+                for query in sql:
+                    if parameters is not None:
+                        self.log.info(f"{query} with parameters {parameters}")
+                        cur.execute(query, parameters)
+                        for row in cur.fetchall():
+                            self.log.info(f'Statement Execution Info - {row}')
+                    else:
+                        self.log.info(query)
+                        cur.execute(query)
+                        for row in cur.fetchall():

Review comment:
       ```suggestion
                           for row in cur:
   ```
   
   More memory efficient.

##########
File path: airflow/providers/snowflake/hooks/snowflake.py
##########
@@ -242,3 +246,31 @@ def set_autocommit(self, conn, autocommit: Any) -> None:
 
     def get_autocommit(self, conn):
         return getattr(conn, 'autocommit_mode', False)
+
+    def run(self, sql, autocommit=True, parameters=None):
+        """The snowflake python connector library doesn't allow to execute multiple statements from a single string or
+        file and doesn't return any information about the execution. This method will override DBApiHook run method to
+        enable this features using snowflake connector methods"""
+        if isinstance(sql, str):
+            sql = [item[0] for item in split_statements(io.StringIO(sql))]
+
+        with closing(self.get_conn()) as conn:
+            self.set_autocommit(conn, autocommit)
+
+            with closing(conn.cursor(DictCursor)) as cur:
+                for query in sql:
+                    if parameters is not None:
+                        self.log.info(f"{query} with parameters {parameters}")
+                        cur.execute(query, parameters)
+                        for row in cur.fetchall():
+                            self.log.info(f'Statement Execution Info - {row}')
+                    else:
+                        self.log.info(query)
+                        cur.execute(query)
+                        for row in cur.fetchall():
+                            self.log.info(f'Statement Execution Info - {row}')
+
+            # If autocommit was set to False for db that supports autocommit,
+            # or if db does not supports autocommit, we do a manual commit.
+            if not self.get_autocommit(conn):
+                conn.commit()

Review comment:
       I believe the standard way to do this is on the cursor before closing instead. The `get_autocommit()` check is also not needed since [`commit()` is a no-op in autocommit mode](https://www.python.org/dev/peps/pep-0249/#commit).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org