You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/04 07:02:08 UTC

[12/50] incubator-airflow git commit: [AIRFLOW-1575] Add AWS Kinesis Firehose Hook for inserting batch records

[AIRFLOW-1575] Add AWS Kinesis Firehose Hook for inserting batch records

Closes #3275 from sid88in/feature/kinesis_hookv2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d588e94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d588e94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d588e94

Branch: refs/heads/v1-10-test
Commit: 2d588e9433cd9a1a1381cf939f579f7d7e53330f
Parents: e691acc
Author: sid.gupta <si...@glassdoor.com>
Authored: Sat Apr 28 23:11:36 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:11:36 2018 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/aws_firehose_hook.py    | 52 ++++++++++++++++
 tests/contrib/hooks/test_aws_firehose_hook.py | 70 ++++++++++++++++++++++
 2 files changed, 122 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/airflow/contrib/hooks/aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/aws_firehose_hook.py b/airflow/contrib/hooks/aws_firehose_hook.py
new file mode 100644
index 0000000..cf7b2fc
--- /dev/null
+++ b/airflow/contrib/hooks/aws_firehose_hook.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.aws_hook import AwsHook
+
+
+class AwsFirehoseHook(AwsHook):
+    """
+    Interact with AWS Kinesis Firehose.
+    :param delivery_stream: Name of the delivery stream
+    :type delivery_stream: str
+    :param region_name: AWS region name (example: us-east-1)
+    :type region_name: str
+    """
+
+    def __init__(self, delivery_stream, region_name=None, *args, **kwargs):
+        self.delivery_stream = delivery_stream
+        self.region_name = region_name
+        super(AwsFirehoseHook, self).__init__(*args, **kwargs)
+
+    def get_conn(self):
+        """
+        Returns AwsHook connection object.
+        """
+
+        self.conn = self.get_client_type('firehose', self.region_name)
+        return self.conn
+
+    def put_records(self, records):
+        """
+        Write batch records to Kinesis Firehose
+        """
+
+        firehose_conn = self.get_conn()
+
+        response = firehose_conn.put_record_batch(
+            DeliveryStreamName=self.delivery_stream,
+            Records=records
+        )
+
+        return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/tests/contrib/hooks/test_aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_aws_firehose_hook.py b/tests/contrib/hooks/test_aws_firehose_hook.py
new file mode 100644
index 0000000..0a2c809
--- /dev/null
+++ b/tests/contrib/hooks/test_aws_firehose_hook.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import uuid
+
+from airflow.contrib.hooks.aws_firehose_hook import AwsFirehoseHook
+
+try:
+    from moto import mock_kinesis
+except ImportError:
+    mock_kinesis = None
+
+
+class TestAwsFirehoseHook(unittest.TestCase):
+
+    @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present')
+    @mock_kinesis
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = AwsFirehoseHook(aws_conn_id='aws_default',
+                               delivery_stream="test_airflow", region_name="us-east-1")
+        self.assertIsNotNone(hook.get_conn())
+
+    @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present')
+    @mock_kinesis
+    def test_insert_batch_records_kinesis_firehose(self):
+        hook = AwsFirehoseHook(aws_conn_id='aws_default',
+                               delivery_stream="test_airflow", region_name="us-east-1")
+
+        response = hook.get_conn().create_delivery_stream(
+            DeliveryStreamName="test_airflow",
+            S3DestinationConfiguration={
+                'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
+                'BucketARN': 'arn:aws:s3:::kinesis-test',
+                'Prefix': 'airflow/',
+                'BufferingHints': {
+                    'SizeInMBs': 123,
+                    'IntervalInSeconds': 124
+                },
+                'CompressionFormat': 'UNCOMPRESSED',
+            }
+        )
+
+        stream_arn = response['DeliveryStreamARN']
+        self.assertEquals(
+            stream_arn, "arn:aws:firehose:us-east-1:123456789012:deliverystream/test_airflow")
+
+        records = [{"Data": str(uuid.uuid4())}
+                   for _ in range(100)]
+
+        response = hook.put_records(records)
+
+        self.assertEquals(response['FailedPutCount'], 0)
+        self.assertEquals(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+
+if __name__ == '__main__':
+    unittest.main()