You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/21 00:21:12 UTC

[GitHub] [airflow] pastanton opened a new pull request, #26543: Added KinesisHook to kinesis.py

pastanton opened a new pull request, #26543:
URL: https://github.com/apache/airflow/pull/26543

   Added KinesisHook to kinesis.py
   
   KinesisHook - a basic hook for AWS Kinesis data streams to easily put records from Airflow.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26543: Added KinesisHook to kinesis.py

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26543:
URL: https://github.com/apache/airflow/pull/26543#issuecomment-1254263577

   Static checks to fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26543: Added KinesisHook to kinesis.py

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26543:
URL: https://github.com/apache/airflow/pull/26543#discussion_r977489350


##########
airflow/providers/amazon/aws/hooks/kinesis.py:
##########
@@ -44,3 +44,41 @@ def __init__(self, delivery_stream: str, *args, **kwargs) -> None:
     def put_records(self, records: Iterable):
         """Write batch records to Kinesis Firehose"""
         return self.get_conn().put_record_batch(DeliveryStreamName=self.delivery_stream, Records=records)
+
+
+class KinesisHook(AwsBaseHook):
+    """
+    Interact with AWS Kinesis Data Stream.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    :param delivery_stream: Name of the data stream
+
+    AWS Kinesis docs: https://docs.aws.amazon.com/streams/latest/dev/introduction.html
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, delivery_stream: str, *args, **kwargs) -> None:
+        self.delivery_stream = delivery_stream

Review Comment:
   Might be better move `delivery_stream` to a `put_records` argument. This allow keep AWS Hooks as simple thin wrapper for specific `boto3.client`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #26543: Added KinesisHook to kinesis.py

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #26543: Added KinesisHook to kinesis.py
URL: https://github.com/apache/airflow/pull/26543


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26543: Added KinesisHook to kinesis.py

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26543:
URL: https://github.com/apache/airflow/pull/26543#discussion_r981381897


##########
airflow/providers/amazon/aws/hooks/kinesis.py:
##########
@@ -44,3 +44,41 @@ def __init__(self, delivery_stream: str, *args, **kwargs) -> None:
     def put_records(self, records: Iterable):
         """Write batch records to Kinesis Firehose"""
         return self.get_conn().put_record_batch(DeliveryStreamName=self.delivery_stream, Records=records)
+
+
+class KinesisHook(AwsBaseHook):
+    """
+    Interact with AWS Kinesis Data Stream.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    :param delivery_stream: Name of the data stream
+
+    AWS Kinesis docs: https://docs.aws.amazon.com/streams/latest/dev/introduction.html
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, delivery_stream: str, *args, **kwargs) -> None:
+        self.delivery_stream = delivery_stream
+        kwargs["client_type"] = "kinesis"
+        super().__init__(*args, **kwargs)
+
+    def put_records(self, records: list[dict[str, str| bytes]]):

Review Comment:
   Yes! completely agree!  All the autocomplete, and none of the code!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #26543: Added KinesisHook to kinesis.py

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #26543:
URL: https://github.com/apache/airflow/pull/26543#discussion_r981390269


##########
airflow/providers/amazon/aws/hooks/kinesis.py:
##########
@@ -44,3 +44,41 @@ def __init__(self, delivery_stream: str, *args, **kwargs) -> None:
     def put_records(self, records: Iterable):
         """Write batch records to Kinesis Firehose"""
         return self.get_conn().put_record_batch(DeliveryStreamName=self.delivery_stream, Records=records)
+
+
+class KinesisHook(AwsBaseHook):
+    """
+    Interact with AWS Kinesis Data Stream.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    :param delivery_stream: Name of the data stream
+
+    AWS Kinesis docs: https://docs.aws.amazon.com/streams/latest/dev/introduction.html
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, delivery_stream: str, *args, **kwargs) -> None:
+        self.delivery_stream = delivery_stream
+        kwargs["client_type"] = "kinesis"
+        super().__init__(*args, **kwargs)
+
+    def put_records(self, records: list[dict[str, str| bytes]]):

Review Comment:
   Just realized that's what you exactly said in your previous comment (oops)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #26543: Added KinesisHook to kinesis.py

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #26543:
URL: https://github.com/apache/airflow/pull/26543#discussion_r981352495


##########
airflow/providers/amazon/aws/hooks/kinesis.py:
##########
@@ -44,3 +44,41 @@ def __init__(self, delivery_stream: str, *args, **kwargs) -> None:
     def put_records(self, records: Iterable):
         """Write batch records to Kinesis Firehose"""
         return self.get_conn().put_record_batch(DeliveryStreamName=self.delivery_stream, Records=records)
+
+
+class KinesisHook(AwsBaseHook):
+    """
+    Interact with AWS Kinesis Data Stream.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    :param delivery_stream: Name of the data stream
+
+    AWS Kinesis docs: https://docs.aws.amazon.com/streams/latest/dev/introduction.html
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, delivery_stream: str, *args, **kwargs) -> None:
+        self.delivery_stream = delivery_stream
+        kwargs["client_type"] = "kinesis"
+        super().__init__(*args, **kwargs)
+
+    def put_records(self, records: list[dict[str, str| bytes]]):

Review Comment:
   This method does not seem to add any value to [boto3.put_records](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_record) method. It is a just a wrapper around it. We are trying to avoid such method, it adds a layer for no additional value. I know this how it was done in `FirehoseHook` but in my opinion we should not follow this pattern. A good example is the hook `/airflow/providers/amazon/aws/hooks/rds.py` which provides nothing but the connection object to query `boto3`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #26543: Added KinesisHook to kinesis.py

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #26543:
URL: https://github.com/apache/airflow/pull/26543#issuecomment-1312279265

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org