You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@libcloud.apache.org by to...@apache.org on 2015/03/28 15:51:19 UTC
[02/16] libcloud git commit: Refactor AWS v4 signing into smaller
tested functions
Refactor AWS v4 signing into smaller tested functions
Signed-off-by: Tomaz Muraus <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/libcloud/repo
Commit: http://git-wip-us.apache.org/repos/asf/libcloud/commit/7bb56899
Tree: http://git-wip-us.apache.org/repos/asf/libcloud/tree/7bb56899
Diff: http://git-wip-us.apache.org/repos/asf/libcloud/diff/7bb56899
Branch: refs/heads/trunk
Commit: 7bb56899ed76cbae162c6264eddfde3602756f0c
Parents: bf5e60e
Author: Gertjan Oude Lohuis <ge...@byte.nl>
Authored: Thu Mar 5 15:08:08 2015 +0100
Committer: Tomaz Muraus <to...@apache.org>
Committed: Fri Mar 6 15:58:12 2015 +0100
----------------------------------------------------------------------
libcloud/common/aws.py | 106 ++++++++++-------
libcloud/test/common/test_aws.py | 211 ++++++++++++++++++++++++++++++++++
2 files changed, 277 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/libcloud/blob/7bb56899/libcloud/common/aws.py
----------------------------------------------------------------------
diff --git a/libcloud/common/aws.py b/libcloud/common/aws.py
index 4bee406..9cc269d 100644
--- a/libcloud/common/aws.py
+++ b/libcloud/common/aws.py
@@ -194,52 +194,78 @@ class V4SignedAWSConnection(AWSTokenConnection):
return params, headers
def _get_authorization_v4_header(self, params, headers, dt):
- # TODO: according to AWS spec (and RFC 2616 Section 4.2.) excess whitespace
- # from inside non-quoted strings should be stripped. Now we only strip the
- # start and end of the string. See http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+ assert self.method == 'GET', 'AWS Signature V4 not implemented for other methods than GET'
+
+ return 'AWS4-HMAC-SHA256 Credential=%(u)s/%(c)s, SignedHeaders=%(sh)s, Signature=%(s)s' % {
+ 'u': self.user_id,
+ 'c': self._get_credential_scope(dt),
+ 'sh': self._get_signed_headers(headers),
+ 's': self._get_signature(params, headers, dt)
+ }
+
+ def _get_signature(self, params, headers, dt):
+ return _sign(
+ self._get_key_to_sign_with(dt),
+ self._get_string_to_sign(params, headers, dt),
+ hex=True)
+
+ def _get_key_to_sign_with(self, dt):
+ return _sign(
+ _sign(
+ _sign(
+ _sign(('AWS4' + self.key), dt.strftime('%Y%m%d')),
+ self.driver.region_name),
+ self.service_name),
+ 'aws4_request')
+
+ def _get_string_to_sign(self, params, headers, dt):
+ credential_scope = self._get_credential_scope(dt)
+ canonical_request = self._get_canonical_request(params, headers)
+
+ return '\n'.join(['AWS4-HMAC-SHA256',
+ dt.strftime('%Y%m%dT%H%M%SZ'),
+ credential_scope,
+ hashlib.sha256(canonical_request).hexdigest()])
+
+ def _get_credential_scope(self, dt):
+ return '/'.join([dt.strftime('%Y%m%d'),
+ self.driver.region_name,
+ self.service_name,
+ 'aws4_request'])
+
+ def _get_signed_headers(self, headers):
+ return ';'.join([k.lower() for k in sorted(headers.keys())])
+
+ def _get_canonical_headers(self, headers):
canonical_headers = '\n'.join([':'.join([k.lower(), v.strip()])
for k, v in sorted(headers.items())])
canonical_headers += '\n'
+ return canonical_headers
- signed_headers = ';'.join([k.lower() for k in sorted(headers.keys())])
+ def _get_payload_hash(self):
+ return hashlib.sha256('').hexdigest()
+ def _get_request_params(self, params):
# For self.method == GET
- request_params = '&'.join(["%s=%s" % (urlquote(str(k), safe=''), urlquote(str(v), safe='-_~'))
- for k, v in sorted(params.items())])
- payload_hash = hashlib.sha256('').hexdigest()
-
- canonical_request = '\n'.join([self.method,
- self.action,
- request_params,
- canonical_headers,
- signed_headers,
- payload_hash])
-
- credential_scope = '/'.join([dt.strftime('%Y%m%d'),
- self.driver.region_name,
- self.service_name,
- 'aws4_request'])
- string_to_sign = '\n'.join(['AWS4-HMAC-SHA256',
- dt.strftime('%Y%m%dT%H%M%SZ'),
- credential_scope,
- hashlib.sha256(canonical_request).hexdigest()])
-
- # Key derivation functions. See:
- # http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
- def getSignatureKey(key, date_stamp, regionName, serviceName):
- def sign(key, msg):
- return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
-
- signed_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
- signed_region = sign(signed_date, regionName)
- signed_service = sign(signed_region, serviceName)
- return sign(signed_service, 'aws4_request')
-
- signing_key = getSignatureKey(self.key, dt.strftime('%Y%m%d'), self.driver.region_name, self.service_name)
- signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
-
- return 'AWS4-HMAC-SHA256 Credential=%s/%s, SignedHeaders=%s, Signature=%s' % \
- (self.user_id, credential_scope, signed_headers, signature)
+ return '&'.join(["%s=%s" % (urlquote(k, safe=''), urlquote(str(v), safe='~'))
+ for k, v in sorted(params.items())])
+
+ def _get_canonical_request(self, params, headers):
+ return '\n'.join([
+ self.method,
+ self.action,
+ self._get_request_params(params),
+ self._get_canonical_headers(headers),
+ self._get_signed_headers(headers),
+ self._get_payload_hash()
+ ])
+
+
+def _sign(key, msg, hex=False):
+ if hex:
+ return hmac.new(key, b(msg), hashlib.sha256).hexdigest()
+ else:
+ return hmac.new(key, b(msg), hashlib.sha256).digest()
class AWSDriver(BaseDriver):
http://git-wip-us.apache.org/repos/asf/libcloud/blob/7bb56899/libcloud/test/common/test_aws.py
----------------------------------------------------------------------
diff --git a/libcloud/test/common/test_aws.py b/libcloud/test/common/test_aws.py
new file mode 100644
index 0000000..1f207df
--- /dev/null
+++ b/libcloud/test/common/test_aws.py
@@ -0,0 +1,211 @@
+from datetime import datetime
+import unittest
+import mock
+from libcloud.common.aws import V4SignedAWSConnection
+
+
+class EC2MockDriver(object):
+ region_name = 'my_region'
+
+
+class V4SignedAWSConnectionTest(unittest.TestCase):
+
+ def setUp(self):
+ V4SignedAWSConnection.service_name = 'my_service'
+ V4SignedAWSConnection.method = 'GET'
+ V4SignedAWSConnection.action = '/my_action/'
+ V4SignedAWSConnection.driver = EC2MockDriver()
+
+ self.conn = V4SignedAWSConnection('my_key', 'my_secret')
+ self.now = datetime(2015, 3, 4, hour=17, minute=34, second=52)
+
+ def test_v4_signature(self):
+ sig = self.conn._get_authorization_v4_header({
+ 'Action': 'DescribeInstances',
+ 'Version': '2013-10-15'
+ }, {
+ 'Host': 'ec2.eu-west-1.amazonaws.com',
+ 'Accept-Encoding': 'gzip,deflate',
+ 'X-AMZ-Date': '20150304T173452Z',
+ 'User-Agent': 'libcloud/0.17.0 (Amazon EC2 (eu-central-1)) '
+ }, self.now)
+ self.assertEqual(sig, 'AWS4-HMAC-SHA256 '
+ 'Credential=my_key/20150304/my_region/my_service/aws4_request, '
+ 'SignedHeaders=accept-encoding;host;user-agent;x-amz-date, '
+ 'Signature=f9868f8414b3c3f856c7955019cc1691265541f5162b9b772d26044280d39bd3')
+
+ def test_v4_signature_raises_error_if_request_method_not_GET(self):
+ V4SignedAWSConnection.method = 'POST'
+
+ with self.assertRaises(Exception):
+ self.conn._get_authorization_v4_header({}, {}, self.now)
+
+ def test_v4_signature_contains_user_id(self):
+ sig = self.conn._get_authorization_v4_header({}, {}, self.now)
+ self.assertIn('Credential=my_key/', sig)
+
+ def test_v4_signature_contains_credential_scope(self):
+ with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_credential_scope') as mock_get_creds:
+ mock_get_creds.return_value = 'my_credential_scope'
+ sig = self.conn._get_authorization_v4_header({}, {}, self.now)
+
+ self.assertIn('Credential=my_key/my_credential_scope, ', sig)
+
+ def test_v4_signature_contains_signed_headers(self):
+ with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_signed_headers') as mock_get_headers:
+ mock_get_headers.return_value = 'my_signed_headers'
+ sig = self.conn._get_authorization_v4_header({}, {}, self.now)
+ self.assertIn('SignedHeaders=my_signed_headers, ', sig)
+
+ def test_v4_signature_contains_signature(self):
+ with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_signature') as mock_get_signature:
+ mock_get_signature.return_value = 'my_signature'
+ sig = self.conn._get_authorization_v4_header({}, {}, self.now)
+ self.assertIn('Signature=my_signature', sig)
+
+ def test_get_signature_(self):
+ def _sign(key, msg, hex=False):
+ if hex:
+ return 'H|%s|%s' % (key, msg)
+ else:
+ return '%s|%s' % (key, msg)
+
+ with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_key_to_sign_with') as mock_get_key:
+ with mock.patch('libcloud.common.aws.V4SignedAWSConnection._get_string_to_sign') as mock_get_string:
+ with mock.patch('libcloud.common.aws._sign', new=_sign):
+ mock_get_key.return_value = 'my_signing_key'
+ mock_get_string.return_value = 'my_string_to_sign'
+ sig = self.conn._get_signature({}, {}, self.now)
+
+ self.assertEqual(sig, 'H|my_signing_key|my_string_to_sign')
+
+ def test_get_string_to_sign(self):
+ with mock.patch('hashlib.sha256') as mock_sha256:
+ mock_sha256.return_value.hexdigest.return_value = 'chksum_of_canonical_request'
+ to_sign = self.conn._get_string_to_sign({}, {}, self.now)
+
+ self.assertEqual(to_sign,
+ 'AWS4-HMAC-SHA256\n'
+ '20150304T173452Z\n'
+ '20150304/my_region/my_service/aws4_request\n'
+ 'chksum_of_canonical_request')
+
+ def test_get_key_to_sign_with(self):
+ def _sign(key, msg, hex=False):
+ return '%s|%s' % (key, msg)
+
+ with mock.patch('libcloud.common.aws._sign', new=_sign):
+ key = self.conn._get_key_to_sign_with(self.now)
+
+ self.assertEqual(key, 'AWS4my_secret|20150304|my_region|my_service|aws4_request')
+
+ def test_get_signed_headers_contains_all_headers_lowercased(self):
+ headers = {'Content-Type': 'text/plain', 'Host': 'my_host', 'X-Special-Header': ''}
+ signed_headers = self.conn._get_signed_headers(headers)
+
+ self.assertIn('content-type', signed_headers)
+ self.assertIn('host', signed_headers)
+ self.assertIn('x-special-header', signed_headers)
+
+ def test_get_signed_headers_concats_headers_sorted_lexically(self):
+ headers = {'Host': 'my_host', 'X-Special-Header': '', '1St-Header': '2', 'Content-Type': 'text/plain'}
+ signed_headers = self.conn._get_signed_headers(headers)
+
+ self.assertEqual(signed_headers, '1st-header;content-type;host;x-special-header')
+
+ def test_get_credential_scope(self):
+ scope = self.conn._get_credential_scope(self.now)
+ self.assertEqual(scope, '20150304/my_region/my_service/aws4_request')
+
+ def test_get_canonical_headers_joins_all_headers(self):
+ headers = {
+ 'accept-encoding': 'gzip,deflate',
+ 'host': 'my_host',
+ }
+ self.assertEqual(self.conn._get_canonical_headers(headers),
+ 'accept-encoding:gzip,deflate\n'
+ 'host:my_host\n')
+
+ def test_get_canonical_headers_sorts_headers_lexically(self):
+ headers = {
+ 'accept-encoding': 'gzip,deflate',
+ 'host': 'my_host',
+ '1st-header': '2',
+ 'x-amz-date': '20150304T173452Z',
+ 'user-agent': 'my-ua'
+ }
+ self.assertEqual(self.conn._get_canonical_headers(headers),
+ '1st-header:2\n'
+ 'accept-encoding:gzip,deflate\n'
+ 'host:my_host\n'
+ 'user-agent:my-ua\n'
+ 'x-amz-date:20150304T173452Z\n')
+
+ def test_get_canonical_headers_lowercases_headers_names(self):
+ headers = {
+ 'Accept-Encoding': 'GZIP,DEFLATE',
+ 'User-Agent': 'My-UA'
+ }
+ self.assertEqual(self.conn._get_canonical_headers(headers),
+ 'accept-encoding:GZIP,DEFLATE\n'
+ 'user-agent:My-UA\n')
+
+ def test_get_canonical_headers_trims_header_values(self):
+ # TODO: according to AWS spec (and RFC 2616 Section 4.2.) excess whitespace
+ # from inside non-quoted strings should be stripped. Now we only strip the
+ # start and end of the string. See
+ # http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+ headers = {
+ 'accept-encoding': ' gzip,deflate',
+ 'user-agent': 'libcloud/0.17.0 '
+ }
+ self.assertEqual(self.conn._get_canonical_headers(headers),
+ 'accept-encoding:gzip,deflate\n'
+ 'user-agent:libcloud/0.17.0\n')
+
+ def test_get_request_params_joins_params_sorted_lexically(self):
+ self.assertEqual(self.conn._get_request_params({
+ 'Action': 'DescribeInstances',
+ 'Filter.1.Name': 'state',
+ 'Version': '2013-10-15'
+ }),
+ 'Action=DescribeInstances&Filter.1.Name=state&Version=2013-10-15')
+
+ def test_get_request_params_allows_integers_as_value(self):
+ self.assertEqual(self.conn._get_request_params({'Action': 'DescribeInstances', 'Port': 22}),
+ 'Action=DescribeInstances&Port=22')
+
+ def test_get_request_params_urlquotes_params_keys(self):
+ self.assertEqual(self.conn._get_request_params({'Action+Reaction': 'DescribeInstances'}),
+ 'Action%2BReaction=DescribeInstances')
+
+ def test_get_request_params_urlquotes_params_values(self):
+ self.assertEqual(self.conn._get_request_params({
+ 'Action': 'DescribeInstances&Addresses',
+ 'Port-Range': '2000 3000'
+ }),
+ 'Action=DescribeInstances%26Addresses&Port-Range=2000%203000')
+
+ def test_get_request_params_urlquotes_params_values_allows_safe_chars_in_value(self):
+ # http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+ self.assertEqual('Action=a~b.c_d-e',
+ self.conn._get_request_params({'Action': 'a~b.c_d-e'}))
+
+ def test_get_payload_hash_returns_digest_of_empty_string_for_GET_requests(self):
+ V4SignedAWSConnection.method = 'GET'
+ self.assertEqual(self.conn._get_payload_hash(),
+ 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855')
+
+ def test_get_canonical_request(self):
+ req = self.conn._get_canonical_request(
+ {'Action': 'DescribeInstances', 'Version': '2013-10-15'},
+ {'Accept-Encoding': 'gzip,deflate', 'User-Agent': 'My-UA'}
+ )
+ self.assertEqual(req, 'GET\n'
+ '/my_action/\n'
+ 'Action=DescribeInstances&Version=2013-10-15\n'
+ 'accept-encoding:gzip,deflate\n'
+ 'user-agent:My-UA\n'
+ '\n'
+ 'accept-encoding;user-agent\n'
+ 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855')