You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/01 12:39:15 UTC

[GitHub] [airflow] potiuk opened a new pull request #20618: Refactor vertica_to_mysql to make it more 'mypy' friendly

potiuk opened a new pull request #20618:
URL: https://github.com/apache/airflow/pull/20618


   Part of #19891
   
   MyPy was confused by the logic in this method (and so humans could
   be) because there were some implicit relations between bulk_load
   and tmpfle. This refector makes the bulk_load and non-bulk load
   separate (extracting common parts) and more obvious.
   
   Thanks MyPy for flagging this one.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20618: Refactor vertica_to_mysql to make it more 'mypy' friendly

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20618:
URL: https://github.com/apache/airflow/pull/20618#issuecomment-1003553314


   cc: @subkanthi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #20618: Refactor vertica_to_mysql to make it more 'mypy' friendly

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #20618:
URL: https://github.com/apache/airflow/pull/20618#discussion_r778816203



##########
File path: airflow/providers/mysql/transfers/vertica_to_mysql.py
##########
@@ -94,63 +94,72 @@ def execute(self, context: 'Context'):
         vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
 
-        tmpfile = None
-        result = None
+        if self.bulk_load:
+            self._bulk_load_transfer(mysql, vertica)
+        else:
+            self._non_bulk_load_transfer(mysql, vertica)
 
-        selected_columns = []
+        if self.mysql_postoperator:
+            self.log.info("Running MySQL postoperator...")
+            mysql.run(self.mysql_postoperator)
 
-        count = 0
+        self.log.info("Done")
+
+    def _non_bulk_load_transfer(self, mysql, vertica):
         with closing(vertica.get_conn()) as conn:
             with closing(conn.cursor()) as cursor:
                 cursor.execute(self.sql)
                 selected_columns = [d.name for d in cursor.description]
+                self.log.info("Selecting rows from Vertica...")
+                self.log.info(self.sql)
 
-                if self.bulk_load:
-                    with NamedTemporaryFile("w") as tmpfile:
-                        self.log.info("Selecting rows from Vertica to local file %s...", tmpfile.name)
-                        self.log.info(self.sql)
+                result = cursor.fetchall()
+                count = len(result)
 
-                        csv_writer = csv.writer(tmpfile, delimiter='\t', encoding='utf-8')
-                        for row in cursor.iterate():
-                            csv_writer.writerow(row)
-                            count += 1
+                self.log.info("Selected rows from Vertica %s", count)
+        self._run_preoperator(mysql)
+        try:
+            self.log.info("Inserting rows into MySQL...")
+            mysql.insert_rows(table=self.mysql_table, rows=result, target_fields=selected_columns)
+            self.log.info("Inserted rows into MySQL %s", count)
+        except (MySQLdb.Error, MySQLdb.Warning):
+            self.log.info("Inserted rows into MySQL 0")

Review comment:
       Ah no - it's good actually. It prints an info that no rows are inserted and re-raises the exception, so things are good here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #20618: Refactor vertica_to_mysql to make it more 'mypy' friendly

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #20618:
URL: https://github.com/apache/airflow/pull/20618


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20618: Refactor vertica_to_mysql to make it more 'mypy' friendly

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20618:
URL: https://github.com/apache/airflow/pull/20618#issuecomment-1004664855


   Anyone :) ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20618: Refactor vertica_to_mysql to make it more 'mypy' friendly

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20618:
URL: https://github.com/apache/airflow/pull/20618#issuecomment-1005693527


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #20618: Refactor vertica_to_mysql to make it more 'mypy' friendly

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #20618:
URL: https://github.com/apache/airflow/pull/20618#discussion_r778175841



##########
File path: airflow/providers/mysql/transfers/vertica_to_mysql.py
##########
@@ -94,63 +94,72 @@ def execute(self, context: 'Context'):
         vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
 
-        tmpfile = None
-        result = None
+        if self.bulk_load:
+            self._bulk_load_transfer(mysql, vertica)
+        else:
+            self._non_bulk_load_transfer(mysql, vertica)
 
-        selected_columns = []
+        if self.mysql_postoperator:
+            self.log.info("Running MySQL postoperator...")
+            mysql.run(self.mysql_postoperator)
 
-        count = 0
+        self.log.info("Done")
+
+    def _non_bulk_load_transfer(self, mysql, vertica):
         with closing(vertica.get_conn()) as conn:
             with closing(conn.cursor()) as cursor:
                 cursor.execute(self.sql)
                 selected_columns = [d.name for d in cursor.description]
+                self.log.info("Selecting rows from Vertica...")
+                self.log.info(self.sql)
 
-                if self.bulk_load:
-                    with NamedTemporaryFile("w") as tmpfile:
-                        self.log.info("Selecting rows from Vertica to local file %s...", tmpfile.name)
-                        self.log.info(self.sql)
+                result = cursor.fetchall()
+                count = len(result)
 
-                        csv_writer = csv.writer(tmpfile, delimiter='\t', encoding='utf-8')
-                        for row in cursor.iterate():
-                            csv_writer.writerow(row)
-                            count += 1
+                self.log.info("Selected rows from Vertica %s", count)
+        self._run_preoperator(mysql)
+        try:
+            self.log.info("Inserting rows into MySQL...")
+            mysql.insert_rows(table=self.mysql_table, rows=result, target_fields=selected_columns)
+            self.log.info("Inserted rows into MySQL %s", count)
+        except (MySQLdb.Error, MySQLdb.Warning):
+            self.log.info("Inserted rows into MySQL 0")

Review comment:
       Ah, yeah. I have not noticed that :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #20618: Refactor vertica_to_mysql to make it more 'mypy' friendly

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #20618:
URL: https://github.com/apache/airflow/pull/20618


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20618: Refactor vertica_to_mysql to make it more 'mypy' friendly

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20618:
URL: https://github.com/apache/airflow/pull/20618#discussion_r778088139



##########
File path: airflow/providers/mysql/transfers/vertica_to_mysql.py
##########
@@ -94,63 +94,72 @@ def execute(self, context: 'Context'):
         vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
 
-        tmpfile = None
-        result = None
+        if self.bulk_load:
+            self._bulk_load_transfer(mysql, vertica)
+        else:
+            self._non_bulk_load_transfer(mysql, vertica)
 
-        selected_columns = []
+        if self.mysql_postoperator:
+            self.log.info("Running MySQL postoperator...")
+            mysql.run(self.mysql_postoperator)
 
-        count = 0
+        self.log.info("Done")
+
+    def _non_bulk_load_transfer(self, mysql, vertica):
         with closing(vertica.get_conn()) as conn:
             with closing(conn.cursor()) as cursor:
                 cursor.execute(self.sql)
                 selected_columns = [d.name for d in cursor.description]
+                self.log.info("Selecting rows from Vertica...")
+                self.log.info(self.sql)
 
-                if self.bulk_load:
-                    with NamedTemporaryFile("w") as tmpfile:
-                        self.log.info("Selecting rows from Vertica to local file %s...", tmpfile.name)
-                        self.log.info(self.sql)
+                result = cursor.fetchall()
+                count = len(result)
 
-                        csv_writer = csv.writer(tmpfile, delimiter='\t', encoding='utf-8')
-                        for row in cursor.iterate():
-                            csv_writer.writerow(row)
-                            count += 1
+                self.log.info("Selected rows from Vertica %s", count)
+        self._run_preoperator(mysql)
+        try:
+            self.log.info("Inserting rows into MySQL...")
+            mysql.insert_rows(table=self.mysql_table, rows=result, target_fields=selected_columns)
+            self.log.info("Inserted rows into MySQL %s", count)
+        except (MySQLdb.Error, MySQLdb.Warning):
+            self.log.info("Inserted rows into MySQL 0")

Review comment:
       Sorry I know its not related to the change, maybe we can change this error and print the exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org