You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/04 07:02:08 UTC
[12/50] incubator-airflow git commit: [AIRFLOW-1575] Add AWS Kinesis
Firehose Hook for inserting batch records
[AIRFLOW-1575] Add AWS Kinesis Firehose Hook for inserting batch records
Closes #3275 from sid88in/feature/kinesis_hookv2
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d588e94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d588e94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d588e94
Branch: refs/heads/v1-10-test
Commit: 2d588e9433cd9a1a1381cf939f579f7d7e53330f
Parents: e691acc
Author: sid.gupta <si...@glassdoor.com>
Authored: Sat Apr 28 23:11:36 2018 -0700
Committer: r39132 <si...@yahoo.com>
Committed: Sat Apr 28 23:11:36 2018 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/aws_firehose_hook.py | 52 ++++++++++++++++
tests/contrib/hooks/test_aws_firehose_hook.py | 70 ++++++++++++++++++++++
2 files changed, 122 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/airflow/contrib/hooks/aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/aws_firehose_hook.py b/airflow/contrib/hooks/aws_firehose_hook.py
new file mode 100644
index 0000000..cf7b2fc
--- /dev/null
+++ b/airflow/contrib/hooks/aws_firehose_hook.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.aws_hook import AwsHook
+
+
+class AwsFirehoseHook(AwsHook):
+ """
+ Interact with AWS Kinesis Firehose.
+ :param delivery_stream: Name of the delivery stream
+ :type delivery_stream: str
+ :param region_name: AWS region name (example: us-east-1)
+ :type region_name: str
+ """
+
+ def __init__(self, delivery_stream, region_name=None, *args, **kwargs):
+ self.delivery_stream = delivery_stream
+ self.region_name = region_name
+ super(AwsFirehoseHook, self).__init__(*args, **kwargs)
+
+ def get_conn(self):
+ """
+ Returns AwsHook connection object.
+ """
+
+ self.conn = self.get_client_type('firehose', self.region_name)
+ return self.conn
+
+ def put_records(self, records):
+ """
+ Write batch records to Kinesis Firehose
+ """
+
+ firehose_conn = self.get_conn()
+
+ response = firehose_conn.put_record_batch(
+ DeliveryStreamName=self.delivery_stream,
+ Records=records
+ )
+
+ return response
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/tests/contrib/hooks/test_aws_firehose_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_aws_firehose_hook.py b/tests/contrib/hooks/test_aws_firehose_hook.py
new file mode 100644
index 0000000..0a2c809
--- /dev/null
+++ b/tests/contrib/hooks/test_aws_firehose_hook.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import uuid
+
+from airflow.contrib.hooks.aws_firehose_hook import AwsFirehoseHook
+
+try:
+ from moto import mock_kinesis
+except ImportError:
+ mock_kinesis = None
+
+
+class TestAwsFirehoseHook(unittest.TestCase):
+
+ @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present')
+ @mock_kinesis
+ def test_get_conn_returns_a_boto3_connection(self):
+ hook = AwsFirehoseHook(aws_conn_id='aws_default',
+ delivery_stream="test_airflow", region_name="us-east-1")
+ self.assertIsNotNone(hook.get_conn())
+
+ @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present')
+ @mock_kinesis
+ def test_insert_batch_records_kinesis_firehose(self):
+ hook = AwsFirehoseHook(aws_conn_id='aws_default',
+ delivery_stream="test_airflow", region_name="us-east-1")
+
+ response = hook.get_conn().create_delivery_stream(
+ DeliveryStreamName="test_airflow",
+ S3DestinationConfiguration={
+ 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
+ 'BucketARN': 'arn:aws:s3:::kinesis-test',
+ 'Prefix': 'airflow/',
+ 'BufferingHints': {
+ 'SizeInMBs': 123,
+ 'IntervalInSeconds': 124
+ },
+ 'CompressionFormat': 'UNCOMPRESSED',
+ }
+ )
+
+ stream_arn = response['DeliveryStreamARN']
+ self.assertEquals(
+ stream_arn, "arn:aws:firehose:us-east-1:123456789012:deliverystream/test_airflow")
+
+ records = [{"Data": str(uuid.uuid4())}
+ for _ in range(100)]
+
+ response = hook.put_records(records)
+
+ self.assertEquals(response['FailedPutCount'], 0)
+ self.assertEquals(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+
+if __name__ == '__main__':
+ unittest.main()