You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/25 22:39:36 UTC

[GitHub] mik-laj commented on a change in pull request #2206: [AIRFLOW-922] Update PrestoHook to enable synchronous execution

mik-laj commented on a change in pull request #2206: [AIRFLOW-922] Update PrestoHook to enable synchronous execution
URL: https://github.com/apache/airflow/pull/2206#discussion_r251158791
 
 

 ##########
 File path: airflow/hooks/presto_hook.py
 ##########
 @@ -117,11 +118,49 @@ def get_pandas_df(self, hql, parameters=None):
             df = pandas.DataFrame()
         return df
 
-    def run(self, hql, parameters=None):
+    def run(self, sql, parameters=None, poll_interval=None):
         """
-        Execute the statement against Presto. Can be used to create views.
+        Execute statement(s) against Presto. By default, statements are
+        executed asynchronously. To execute each synchronously, pass a non-None
+        poll_interval.
+
+        :param sql: the statement(s) to be executed
+        :type sql: str or iterable
+        :param parameters: the parameters to render the statement(s) with
+        :type parameters: mapping or iterable
+        :param poll_interval: how often, in seconds, to check the execution
+            status of each statement; set to None
+        :type poll_interval: int or float
         """
-        return super(PrestoHook, self).run(self._strip_sql(hql), parameters)
+        if isinstance(sql, str):
+            sql = [sql]
+
+        cursor = self.get_conn().cursor()
+
+        for stmt in sql:
+            stmt = self._strip_sql(stmt)
+            self.log.info("{} with parameters {}".format(stmt, parameters))
+            cursor.execute(stmt, parameters)
+
+            if poll_interval is not None:
+                while not self.execution_finished(cursor):
+                    time.sleep(poll_interval)
+
+    def execution_finished(self, cursor):
+        """
+        Return a bool indicating whether the latest statement executed by
+        cursor has finished executing. If the execution status can't be
+        determined, e.g. because of a network problem, returns None.
+
+        :param cursor: a cursor
+        :type cursor: presto.Cursor
+        """
+        try:
+            return cursor.poll() is None
+        except Exception as ex:
+            msg = "Couldn't determine statement execution status: ".format(ex)
+            self.log.error(msg)
 
 Review comment:
   You should pass unformatted text to logger. 
   ```suggestion
               self.log.error("Couldn't determine statement execution status: %s", ex)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services