You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/02 23:23:36 UTC
[1/2] beam git commit: [BEAM-1188] Python Bigquery Verifier For E2E
Test
Repository: beam
Updated Branches:
refs/heads/master b322a5d40 -> de9e8528c
[BEAM-1188] Python Bigquery Verifier For E2E Test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd32c266
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd32c266
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd32c266
Branch: refs/heads/master
Commit: dd32c26622ef9d0aa9e8d0c3863ac6660ed336b7
Parents: b322a5d
Author: Mark Liu <ma...@google.com>
Authored: Tue Feb 21 18:48:34 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Mar 2 15:05:27 2017 -0800
----------------------------------------------------------------------
.../cookbook/bigquery_tornadoes_it_test.py | 62 +++++++++++
.../python/apache_beam/io/gcp/tests/__init__.py | 16 +++
.../io/gcp/tests/bigquery_matcher.py | 108 +++++++++++++++++++
.../io/gcp/tests/bigquery_matcher_test.py | 108 +++++++++++++++++++
.../apache_beam/tests/pipeline_verifiers.py | 12 +--
sdks/python/apache_beam/tests/test_utils.py | 12 +++
sdks/python/setup.py | 3 +
7 files changed, 313 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
new file mode 100644
index 0000000..306a09e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for Bigquery tornadoes example."""
+
+import logging
+import time
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.cookbook import bigquery_tornadoes
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
+
+
+class BigqueryTornadoesIT(unittest.TestCase):
+
+ # The default checksum is a SHA-1 hash generated from sorted rows reading
+ # from expected Bigquery table.
+ DEFAULT_CHECKSUM = '83789a7c1bca7959dcf23d3bc37e9204e594330f'
+
+ @attr('IT')
+ def test_bigquery_tornadoes_it(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+
+ # Set extra options to the pipeline for test purpose
+ output_table = ('BigQueryTornadoesIT'
+ '.monthly_tornadoes_%s' % int(round(time.time() * 1000)))
+ query = 'SELECT month, tornado_count FROM [%s]' % output_table
+ pipeline_verifiers = [PipelineStateMatcher(),
+ BigqueryMatcher(
+ project=test_pipeline.get_option('project'),
+ query=query,
+ checksum=self.DEFAULT_CHECKSUM)]
+ extra_opts = {'output': output_table,
+ 'on_success_matcher': all_of(*pipeline_verifiers)}
+
+ # Get pipeline options from command argument: --test-pipeline-options,
+ # and start pipeline job by calling pipeline main function.
+ bigquery_tornadoes.run(
+ test_pipeline.get_full_options_as_args(**extra_opts))
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/io/gcp/tests/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/__init__.py b/sdks/python/apache_beam/io/gcp/tests/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
new file mode 100644
index 0000000..cc26689
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Bigquery data verifier for end-to-end test."""
+
+import logging
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+from apache_beam.tests.test_utils import compute_hash
+from apache_beam.utils import retry
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from google.cloud import bigquery
+ from google.cloud.exceptions import GoogleCloudError
+except ImportError:
+ bigquery = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+MAX_RETRIES = 4
+
+
+def retry_on_http_and_value_error(exception):
+ """Filter allowing retries on Bigquery errors and value error."""
+ return isinstance(exception, GoogleCloudError) or \
+ isinstance(exception, ValueError)
+
+
+class BigqueryMatcher(BaseMatcher):
+ """Matcher that verifies Bigquery data with given query.
+
+ Fetch Bigquery data with given query, compute a hash string and compare
+ with expected checksum.
+ """
+
+ def __init__(self, project, query, checksum):
+ if bigquery is None:
+ raise ImportError(
+ 'Bigquery dependencies are not installed.')
+ if not query or not isinstance(query, str):
+ raise ValueError(
+ 'Invalid argument: query. Please use non-empty string')
+ if not checksum or not isinstance(checksum, str):
+ raise ValueError(
+ 'Invalid argument: checksum. Please use non-empty string')
+ self.project = project
+ self.query = query
+ self.expected_checksum = checksum
+
+ def _matches(self, _):
+ logging.info('Start verify Bigquery data.')
+ # Run query
+ bigquery_client = bigquery.Client(project=self.project)
+ response = self._query_with_retry(bigquery_client)
+ logging.info('Read from given query (%s), total rows %d',
+ self.query, len(response))
+
+ # Compute checksum
+ self.checksum = compute_hash(response)
+ logging.info('Generate checksum: %s', self.checksum)
+
+ # Verify result
+ return self.checksum == self.expected_checksum
+
+ @retry.with_exponential_backoff(
+ num_retries=MAX_RETRIES,
+ retry_filter=retry_on_http_and_value_error)
+ def _query_with_retry(self, bigquery_client):
+ """Run Bigquery query with retry if got error http response"""
+ query = bigquery_client.run_sync_query(self.query)
+ query.run()
+
+ # Fetch query data one page at a time.
+ page_token = None
+ results = []
+ while True:
+ rows, _, page_token = query.fetch_data(page_token=page_token)
+ results.extend(rows)
+ if not page_token:
+ break
+
+ return results
+
+ def describe_to(self, description):
+ description \
+ .append_text("Expected checksum is ") \
+ .append_text(self.expected_checksum)
+
+ def describe_mismatch(self, pipeline_result, mismatch_description):
+ mismatch_description \
+ .append_text("Actual checksum is ") \
+ .append_text(self.checksum)
http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
new file mode 100644
index 0000000..d8aa148
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit test for Bigquery verifier"""
+
+import logging
+import unittest
+
+from hamcrest import assert_that as hc_assert_that
+from mock import Mock, patch
+
+from apache_beam.io.gcp.tests import bigquery_matcher as bq_verifier
+from apache_beam.tests.test_utils import patch_retry
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from google.cloud import bigquery
+ from google.cloud.exceptions import NotFound
+except ImportError:
+ bigquery = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(bigquery is None, 'Bigquery dependencies are not installed.')
+class BigqueryMatcherTest(unittest.TestCase):
+
+ def setUp(self):
+ self._mock_result = Mock()
+ patch_retry(self, bq_verifier)
+
+ @patch.object(bigquery, 'Client')
+ def test_bigquery_matcher_success(self, mock_bigquery):
+ mock_query = Mock()
+ mock_client = mock_bigquery.return_value
+ mock_client.run_sync_query.return_value = mock_query
+ mock_query.fetch_data.return_value = ([], None, None)
+
+ matcher = bq_verifier.BigqueryMatcher(
+ 'mock_project',
+ 'mock_query',
+ 'da39a3ee5e6b4b0d3255bfef95601890afd80709')
+ hc_assert_that(self._mock_result, matcher)
+
+ @patch.object(bigquery, 'Client')
+ def test_bigquery_matcher_query_run_error(self, mock_bigquery):
+ mock_query = Mock()
+ mock_client = mock_bigquery.return_value
+ mock_client.run_sync_query.return_value = mock_query
+ mock_query.run.side_effect = ValueError('job is already running')
+
+ matcher = bq_verifier.BigqueryMatcher('mock_project',
+ 'mock_query',
+ 'mock_checksum')
+ with self.assertRaises(ValueError):
+ hc_assert_that(self._mock_result, matcher)
+ self.assertTrue(mock_query.run.called)
+ self.assertEqual(bq_verifier.MAX_RETRIES + 1, mock_query.run.call_count)
+
+ @patch.object(bigquery, 'Client')
+ def test_bigquery_matcher_fetch_data_error(self, mock_bigquery):
+ mock_query = Mock()
+ mock_client = mock_bigquery.return_value
+ mock_client.run_sync_query.return_value = mock_query
+ mock_query.fetch_data.side_effect = ValueError('query job not executed')
+
+ matcher = bq_verifier.BigqueryMatcher('mock_project',
+ 'mock_query',
+ 'mock_checksum')
+ with self.assertRaises(ValueError):
+ hc_assert_that(self._mock_result, matcher)
+ self.assertTrue(mock_query.fetch_data.called)
+ self.assertEqual(bq_verifier.MAX_RETRIES + 1,
+ mock_query.fetch_data.call_count)
+
+ @patch.object(bigquery, 'Client')
+ def test_bigquery_matcher_query_responds_error_code(self, mock_bigquery):
+ mock_query = Mock()
+ mock_client = mock_bigquery.return_value
+ mock_client.run_sync_query.return_value = mock_query
+ mock_query.run.side_effect = NotFound('table is not found')
+
+ matcher = bq_verifier.BigqueryMatcher('mock_project',
+ 'mock_query',
+ 'mock_checksum')
+ with self.assertRaises(NotFound):
+ hc_assert_that(self._mock_result, matcher)
+ self.assertTrue(mock_query.run.called)
+ self.assertEqual(bq_verifier.MAX_RETRIES + 1, mock_query.run.call_count)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 41dfc07..379a96f 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -22,13 +22,13 @@ of test pipeline job. Customized verifier should extend
`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
"""
-import hashlib
import logging
from hamcrest.core.base_matcher import BaseMatcher
from apache_beam.io.fileio import ChannelFactory
from apache_beam.runners.runner import PipelineState
+from apache_beam.tests import test_utils as utils
from apache_beam.utils import retry
try:
@@ -76,7 +76,7 @@ class FileChecksumMatcher(BaseMatcher):
"""Matcher that verifies file(s) content by comparing file checksum.
Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
- is a SHA-1 hash computed from content of file(s).
+ is a hash string computed from content of file(s).
"""
def __init__(self, file_path, expected_checksum):
@@ -103,13 +103,9 @@ class FileChecksumMatcher(BaseMatcher):
read_lines = self._read_with_retry()
# Compute checksum
- read_lines.sort()
- m = hashlib.new('sha1')
- for line in read_lines:
- m.update(line)
- self.checksum, num_lines = (m.hexdigest(), len(read_lines))
+ self.checksum = utils.compute_hash(read_lines)
logging.info('Read from given path %s, %d lines, checksum: %s.',
- self.file_path, num_lines, self.checksum)
+ self.file_path, len(read_lines), self.checksum)
return self.checksum == self.expected_checksum
def describe_to(self, description):
http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
index 3fdfe88..666207e 100644
--- a/sdks/python/apache_beam/tests/test_utils.py
+++ b/sdks/python/apache_beam/tests/test_utils.py
@@ -17,11 +17,23 @@
"""Utility methods for testing"""
+import hashlib
import imp
from mock import Mock, patch
from apache_beam.utils import retry
+DEFAULT_HASHING_ALG = 'sha1'
+
+
+def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG):
+ """Compute a hash value from a list of string."""
+ content.sort()
+ m = hashlib.new(hashing_alg)
+ for elem in content:
+ m.update(str(elem))
+ return m.hexdigest()
+
def patch_retry(testcase, module):
"""A function to patch retry module to use mock clock and logger.
http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 022d69d..cf210d9 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -87,6 +87,7 @@ REQUIRED_PACKAGES = [
'avro>=1.7.7,<2.0.0',
'crcmod>=1.7,<2.0',
'dill==0.2.6',
+ 'google-cloud-bigquery>=0.22.1,<1.0.0',
'httplib2>=0.8,<0.10',
'mock>=1.0.1,<3.0.0',
'oauth2client>=2.0.1,<4.0.0',
@@ -102,6 +103,8 @@ GCP_REQUIREMENTS = [
'google-apitools>=0.5.6,<1.0.0',
'proto-google-cloud-datastore-v1==0.90.0',
'googledatastore==7.0.0',
+ # GCP packages required by tests
+ 'google-cloud-bigquery>=0.22.1,<0.23',
]
[2/2] beam git commit: This closes #2064
Posted by al...@apache.org.
This closes #2064
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de9e8528
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de9e8528
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de9e8528
Branch: refs/heads/master
Commit: de9e8528c0ed893a575f810e697b90e57b34a2cb
Parents: b322a5d dd32c26
Author: Ahmet Altay <al...@google.com>
Authored: Thu Mar 2 15:05:32 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Mar 2 15:05:32 2017 -0800
----------------------------------------------------------------------
.../cookbook/bigquery_tornadoes_it_test.py | 62 +++++++++++
.../python/apache_beam/io/gcp/tests/__init__.py | 16 +++
.../io/gcp/tests/bigquery_matcher.py | 108 +++++++++++++++++++
.../io/gcp/tests/bigquery_matcher_test.py | 108 +++++++++++++++++++
.../apache_beam/tests/pipeline_verifiers.py | 12 +--
sdks/python/apache_beam/tests/test_utils.py | 12 +++
sdks/python/setup.py | 3 +
7 files changed, 313 insertions(+), 8 deletions(-)
----------------------------------------------------------------------