You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/12 05:43:00 UTC

[GitHub] [airflow] phanikumv opened a new pull request, #28874: Add async hive sensor

phanikumv opened a new pull request, #28874:
URL: https://github.com/apache/airflow/pull/28874

   This PR donates the async hive sensor from [astronomer-providers](https://github.com/astronomer/astronomer-providers) repo to Airflow
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #28874: Add `HivePartitionAsyncSensor`

Posted by GitBox <gi...@apache.org>.
phanikumv commented on code in PR #28874:
URL: https://github.com/apache/airflow/pull/28874#discussion_r1067780016


##########
airflow/providers/apache/hive/provider.yaml:
##########
@@ -56,6 +56,7 @@ dependencies:
   # the sasl library anyway (and there sasl library version is not relevant)
   - sasl>=0.3.1; python_version>="3.9"
   - thrift>=0.9.2
+  - impyla

Review Comment:
   We have written our own asyncio method as shown below, because  impyla give us a handle immediately after submitting the query
    
   ```
   async def partition_exists(self, table: str, schema: str, partition: str, polling_interval: float) -> str:
           """
           Checks for the existence of a partition in the given hive table.
   
           :param table: table in hive where the partition exists.
           :param schema: database where the hive table exists
           :param partition: partition to check for in given hive database and hive table.
           :param polling_interval: polling interval in seconds to sleep between checks
           """
           client = self.get_hive_client()
           cursor = client.cursor()
           query = f"show partitions {schema}.{table} partition({partition})"
           cursor.execute_async(query)
           while cursor.is_executing():
               await asyncio.sleep(polling_interval)
           results = cursor.fetchall()
           if len(results) == 0:
               return "failure"
           return "success"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #28874: Add `HivePartitionAsyncSensor`

Posted by GitBox <gi...@apache.org>.
phanikumv commented on code in PR #28874:
URL: https://github.com/apache/airflow/pull/28874#discussion_r1067887383


##########
airflow/providers/apache/hive/provider.yaml:
##########
@@ -56,6 +56,7 @@ dependencies:
   # the sasl library anyway (and there sasl library version is not relevant)
   - sasl>=0.3.1; python_version>="3.9"
   - thrift>=0.9.2
+  - impyla

Review Comment:
   most sync method wait for the operation to complete, which is not what impyla does here. 
   
   @kaxil any thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #28874: Add `HivePartitionAsyncSensor`

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #28874: Add `HivePartitionAsyncSensor`
URL: https://github.com/apache/airflow/pull/28874


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #28874: Add `HivePartitionAsyncSensor`

Posted by GitBox <gi...@apache.org>.
phanikumv commented on code in PR #28874:
URL: https://github.com/apache/airflow/pull/28874#discussion_r1067780016


##########
airflow/providers/apache/hive/provider.yaml:
##########
@@ -56,6 +56,7 @@ dependencies:
   # the sasl library anyway (and there sasl library version is not relevant)
   - sasl>=0.3.1; python_version>="3.9"
   - thrift>=0.9.2
+  - impyla

Review Comment:
   We have written our own asyncio method as shown below, because  impyla returns immediately after submitting the query.
   
   Please check the link
   https://github.com/cloudera/impyla/blob/v0.16a2/impala/hiveserver2.py#L334-L338
    
   ```
   async def partition_exists(self, table: str, schema: str, partition: str, polling_interval: float) -> str:
           """
           Checks for the existence of a partition in the given hive table.
   
           :param table: table in hive where the partition exists.
           :param schema: database where the hive table exists
           :param partition: partition to check for in given hive database and hive table.
           :param polling_interval: polling interval in seconds to sleep between checks
           """
           client = self.get_hive_client()
           cursor = client.cursor()
           query = f"show partitions {schema}.{table} partition({partition})"
           cursor.execute_async(query)
           while cursor.is_executing():
               await asyncio.sleep(polling_interval)
           results = cursor.fetchall()
           if len(results) == 0:
               return "failure"
           return "success"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28874: Add `HivePartitionAsyncSensor`

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #28874:
URL: https://github.com/apache/airflow/pull/28874#discussion_r1067864413


##########
airflow/providers/apache/hive/provider.yaml:
##########
@@ -56,6 +56,7 @@ dependencies:
   # the sasl library anyway (and there sasl library version is not relevant)
   - sasl>=0.3.1; python_version>="3.9"
   - thrift>=0.9.2
+  - impyla

Review Comment:
   This unfortunetly not transform sync code into asyncio. If this kind transformation would be so easy than we had to transform any sync method to asyncio implementation.
   
   So offhand most of `impyla` methods makes blocking io requests:
   when you call `cursor.execute_async` as well as `cursor.is_executing()` and also `cursor.fetchall()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #28874: Add `HivePartitionAsyncSensor`

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #28874:
URL: https://github.com/apache/airflow/pull/28874#issuecomment-1445512573

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28874: Add `HivePartitionAsyncSensor`

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #28874:
URL: https://github.com/apache/airflow/pull/28874#discussion_r1067755312


##########
airflow/providers/apache/hive/provider.yaml:
##########
@@ -56,6 +56,7 @@ dependencies:
   # the sasl library anyway (and there sasl library version is not relevant)
   - sasl>=0.3.1; python_version>="3.9"
   - thrift>=0.9.2
+  - impyla

Review Comment:
   [impyla](https://github.com/cloudera/impyla) doesn't make any asyncio calls in execute_async.
   



##########
airflow/providers/apache/hive/hooks/hive.py:
##########
@@ -1036,3 +1040,92 @@ def get_pandas_df(  # type: ignore
         res = self.get_results(sql, schema=schema, hive_conf=hive_conf)
         df = pandas.DataFrame(res["data"], columns=[c[0] for c in res["header"]], **kwargs)
         return df
+
+
+class HiveCliAsyncHook(BaseHook):
+    """
+    HiveCliAsyncHook to interact with the Hive using impyla library
+
+    :param metastore_conn_id: connection string for the hive
+    :param auth_mechanism: auth mechanism to use for authentication
+    """
+
+    def __init__(self, metastore_conn_id: str) -> None:
+        """Get the connection parameters separated from connection string"""
+        super().__init__()
+        self.conn = self.get_connection(conn_id=metastore_conn_id)
+        self.auth_mechanism = self.conn.extra_dejson.get("authMechanism", "PLAIN")

Review Comment:
   This block async thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28874: Add `HivePartitionAsyncSensor`

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #28874:
URL: https://github.com/apache/airflow/pull/28874#discussion_r1067900216


##########
airflow/providers/apache/hive/provider.yaml:
##########
@@ -56,6 +56,7 @@ dependencies:
   # the sasl library anyway (and there sasl library version is not relevant)
   - sasl>=0.3.1; python_version>="3.9"
   - thrift>=0.9.2
+  - impyla

Review Comment:
   Not every async implementation is asyncio-compatible.
   
   `asyncio` stand for Asynchronous I/O however `impyla` provides asynchronous execution with block I/O.
   
   Implementation of `impyla` would be perfect for regular Sensor rather than Trigger and defer Operators



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #28874: Add `HivePartitionAsyncSensor`

Posted by GitBox <gi...@apache.org>.
phanikumv commented on code in PR #28874:
URL: https://github.com/apache/airflow/pull/28874#discussion_r1067780016


##########
airflow/providers/apache/hive/provider.yaml:
##########
@@ -56,6 +56,7 @@ dependencies:
   # the sasl library anyway (and there sasl library version is not relevant)
   - sasl>=0.3.1; python_version>="3.9"
   - thrift>=0.9.2
+  - impyla

Review Comment:
   We have written our own asyncio method as shown below, because  impyla give us a handle immediately after submitting the query.
   
   Please check the link
   https://github.com/cloudera/impyla/blob/v0.16a2/impala/hiveserver2.py#L334-L338
    
   ```
   async def partition_exists(self, table: str, schema: str, partition: str, polling_interval: float) -> str:
           """
           Checks for the existence of a partition in the given hive table.
   
           :param table: table in hive where the partition exists.
           :param schema: database where the hive table exists
           :param partition: partition to check for in given hive database and hive table.
           :param polling_interval: polling interval in seconds to sleep between checks
           """
           client = self.get_hive_client()
           cursor = client.cursor()
           query = f"show partitions {schema}.{table} partition({partition})"
           cursor.execute_async(query)
           while cursor.is_executing():
               await asyncio.sleep(polling_interval)
           results = cursor.fetchall()
           if len(results) == 0:
               return "failure"
           return "success"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org