You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/02 23:23:36 UTC

[1/2] beam git commit: [BEAM-1188] Python Bigquery Verifier For E2E Test

Repository: beam
Updated Branches:
  refs/heads/master b322a5d40 -> de9e8528c


[BEAM-1188] Python Bigquery Verifier For E2E Test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd32c266
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd32c266
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd32c266

Branch: refs/heads/master
Commit: dd32c26622ef9d0aa9e8d0c3863ac6660ed336b7
Parents: b322a5d
Author: Mark Liu <ma...@google.com>
Authored: Tue Feb 21 18:48:34 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Mar 2 15:05:27 2017 -0800

----------------------------------------------------------------------
 .../cookbook/bigquery_tornadoes_it_test.py      |  62 +++++++++++
 .../python/apache_beam/io/gcp/tests/__init__.py |  16 +++
 .../io/gcp/tests/bigquery_matcher.py            | 108 +++++++++++++++++++
 .../io/gcp/tests/bigquery_matcher_test.py       | 108 +++++++++++++++++++
 .../apache_beam/tests/pipeline_verifiers.py     |  12 +--
 sdks/python/apache_beam/tests/test_utils.py     |  12 +++
 sdks/python/setup.py                            |   3 +
 7 files changed, 313 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
new file mode 100644
index 0000000..306a09e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for Bigquery tornadoes example."""
+
+import logging
+import time
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples.cookbook import bigquery_tornadoes
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher
+
+
+class BigqueryTornadoesIT(unittest.TestCase):
+
+  # The default checksum is a SHA-1 hash generated from sorted rows reading
+  # from expected Bigquery table.
+  DEFAULT_CHECKSUM = '83789a7c1bca7959dcf23d3bc37e9204e594330f'
+
+  @attr('IT')
+  def test_bigquery_tornadoes_it(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+
+    # Set extra options to the pipeline for test purpose
+    output_table = ('BigQueryTornadoesIT'
+                    '.monthly_tornadoes_%s' % int(round(time.time() * 1000)))
+    query = 'SELECT month, tornado_count FROM [%s]' % output_table
+    pipeline_verifiers = [PipelineStateMatcher(),
+                          BigqueryMatcher(
+                              project=test_pipeline.get_option('project'),
+                              query=query,
+                              checksum=self.DEFAULT_CHECKSUM)]
+    extra_opts = {'output': output_table,
+                  'on_success_matcher': all_of(*pipeline_verifiers)}
+
+    # Get pipeline options from command argument: --test-pipeline-options,
+    # and start pipeline job by calling pipeline main function.
+    bigquery_tornadoes.run(
+        test_pipeline.get_full_options_as_args(**extra_opts))
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/io/gcp/tests/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/__init__.py b/sdks/python/apache_beam/io/gcp/tests/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
new file mode 100644
index 0000000..cc26689
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Bigquery data verifier for end-to-end test."""
+
+import logging
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+from apache_beam.tests.test_utils import compute_hash
+from apache_beam.utils import retry
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import bigquery
+  from google.cloud.exceptions import GoogleCloudError
+except ImportError:
+  bigquery = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+MAX_RETRIES = 4
+
+
+def retry_on_http_and_value_error(exception):
+  """Filter allowing retries on Bigquery errors and value error."""
+  return isinstance(exception, GoogleCloudError) or \
+          isinstance(exception, ValueError)
+
+
+class BigqueryMatcher(BaseMatcher):
+  """Matcher that verifies Bigquery data with given query.
+
+  Fetch Bigquery data with given query, compute a hash string and compare
+  with expected checksum.
+  """
+
+  def __init__(self, project, query, checksum):
+    if bigquery is None:
+      raise ImportError(
+          'Bigquery dependencies are not installed.')
+    if not query or not isinstance(query, str):
+      raise ValueError(
+          'Invalid argument: query. Please use non-empty string')
+    if not checksum or not isinstance(checksum, str):
+      raise ValueError(
+          'Invalid argument: checksum. Please use non-empty string')
+    self.project = project
+    self.query = query
+    self.expected_checksum = checksum
+
+  def _matches(self, _):
+    logging.info('Start verify Bigquery data.')
+    # Run query
+    bigquery_client = bigquery.Client(project=self.project)
+    response = self._query_with_retry(bigquery_client)
+    logging.info('Read from given query (%s), total rows %d',
+                 self.query, len(response))
+
+    # Compute checksum
+    self.checksum = compute_hash(response)
+    logging.info('Generate checksum: %s', self.checksum)
+
+    # Verify result
+    return self.checksum == self.expected_checksum
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry_on_http_and_value_error)
+  def _query_with_retry(self, bigquery_client):
+    """Run Bigquery query with retry if got error http response"""
+    query = bigquery_client.run_sync_query(self.query)
+    query.run()
+
+    # Fetch query data one page at a time.
+    page_token = None
+    results = []
+    while True:
+      rows, _, page_token = query.fetch_data(page_token=page_token)
+      results.extend(rows)
+      if not page_token:
+        break
+
+    return results
+
+  def describe_to(self, description):
+    description \
+      .append_text("Expected checksum is ") \
+      .append_text(self.expected_checksum)
+
+  def describe_mismatch(self, pipeline_result, mismatch_description):
+    mismatch_description \
+      .append_text("Actual checksum is ") \
+      .append_text(self.checksum)

http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
new file mode 100644
index 0000000..d8aa148
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit test for Bigquery verifier"""
+
+import logging
+import unittest
+
+from hamcrest import assert_that as hc_assert_that
+from mock import Mock, patch
+
+from apache_beam.io.gcp.tests import bigquery_matcher as bq_verifier
+from apache_beam.tests.test_utils import patch_retry
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import bigquery
+  from google.cloud.exceptions import NotFound
+except ImportError:
+  bigquery = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(bigquery is None, 'Bigquery dependencies are not installed.')
+class BigqueryMatcherTest(unittest.TestCase):
+
+  def setUp(self):
+    self._mock_result = Mock()
+    patch_retry(self, bq_verifier)
+
+  @patch.object(bigquery, 'Client')
+  def test_bigquery_matcher_success(self, mock_bigquery):
+    mock_query = Mock()
+    mock_client = mock_bigquery.return_value
+    mock_client.run_sync_query.return_value = mock_query
+    mock_query.fetch_data.return_value = ([], None, None)
+
+    matcher = bq_verifier.BigqueryMatcher(
+        'mock_project',
+        'mock_query',
+        'da39a3ee5e6b4b0d3255bfef95601890afd80709')
+    hc_assert_that(self._mock_result, matcher)
+
+  @patch.object(bigquery, 'Client')
+  def test_bigquery_matcher_query_run_error(self, mock_bigquery):
+    mock_query = Mock()
+    mock_client = mock_bigquery.return_value
+    mock_client.run_sync_query.return_value = mock_query
+    mock_query.run.side_effect = ValueError('job is already running')
+
+    matcher = bq_verifier.BigqueryMatcher('mock_project',
+                                          'mock_query',
+                                          'mock_checksum')
+    with self.assertRaises(ValueError):
+      hc_assert_that(self._mock_result, matcher)
+    self.assertTrue(mock_query.run.called)
+    self.assertEqual(bq_verifier.MAX_RETRIES + 1, mock_query.run.call_count)
+
+  @patch.object(bigquery, 'Client')
+  def test_bigquery_matcher_fetch_data_error(self, mock_bigquery):
+    mock_query = Mock()
+    mock_client = mock_bigquery.return_value
+    mock_client.run_sync_query.return_value = mock_query
+    mock_query.fetch_data.side_effect = ValueError('query job not executed')
+
+    matcher = bq_verifier.BigqueryMatcher('mock_project',
+                                          'mock_query',
+                                          'mock_checksum')
+    with self.assertRaises(ValueError):
+      hc_assert_that(self._mock_result, matcher)
+    self.assertTrue(mock_query.fetch_data.called)
+    self.assertEqual(bq_verifier.MAX_RETRIES + 1,
+                     mock_query.fetch_data.call_count)
+
+  @patch.object(bigquery, 'Client')
+  def test_bigquery_matcher_query_responds_error_code(self, mock_bigquery):
+    mock_query = Mock()
+    mock_client = mock_bigquery.return_value
+    mock_client.run_sync_query.return_value = mock_query
+    mock_query.run.side_effect = NotFound('table is not found')
+
+    matcher = bq_verifier.BigqueryMatcher('mock_project',
+                                          'mock_query',
+                                          'mock_checksum')
+    with self.assertRaises(NotFound):
+      hc_assert_that(self._mock_result, matcher)
+    self.assertTrue(mock_query.run.called)
+    self.assertEqual(bq_verifier.MAX_RETRIES + 1, mock_query.run.call_count)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 41dfc07..379a96f 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -22,13 +22,13 @@ of test pipeline job. Customized verifier should extend
 `hamcrest.core.base_matcher.BaseMatcher` and override _matches.
 """
 
-import hashlib
 import logging
 
 from hamcrest.core.base_matcher import BaseMatcher
 
 from apache_beam.io.fileio import ChannelFactory
 from apache_beam.runners.runner import PipelineState
+from apache_beam.tests import test_utils as utils
 from apache_beam.utils import retry
 
 try:
@@ -76,7 +76,7 @@ class FileChecksumMatcher(BaseMatcher):
   """Matcher that verifies file(s) content by comparing file checksum.
 
   Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
-  is a SHA-1 hash computed from content of file(s).
+  is a hash string computed from content of file(s).
   """
 
   def __init__(self, file_path, expected_checksum):
@@ -103,13 +103,9 @@ class FileChecksumMatcher(BaseMatcher):
     read_lines = self._read_with_retry()
 
     # Compute checksum
-    read_lines.sort()
-    m = hashlib.new('sha1')
-    for line in read_lines:
-      m.update(line)
-    self.checksum, num_lines = (m.hexdigest(), len(read_lines))
+    self.checksum = utils.compute_hash(read_lines)
     logging.info('Read from given path %s, %d lines, checksum: %s.',
-                 self.file_path, num_lines, self.checksum)
+                 self.file_path, len(read_lines), self.checksum)
     return self.checksum == self.expected_checksum
 
   def describe_to(self, description):

http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
index 3fdfe88..666207e 100644
--- a/sdks/python/apache_beam/tests/test_utils.py
+++ b/sdks/python/apache_beam/tests/test_utils.py
@@ -17,11 +17,23 @@
 
 """Utility methods for testing"""
 
+import hashlib
 import imp
 from mock import Mock, patch
 
 from apache_beam.utils import retry
 
+DEFAULT_HASHING_ALG = 'sha1'
+
+
+def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG):
+  """Compute a hash value from a list of string."""
+  content.sort()
+  m = hashlib.new(hashing_alg)
+  for elem in content:
+    m.update(str(elem))
+  return m.hexdigest()
+
 
 def patch_retry(testcase, module):
   """A function to patch retry module to use mock clock and logger.

http://git-wip-us.apache.org/repos/asf/beam/blob/dd32c266/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 022d69d..cf210d9 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -87,6 +87,7 @@ REQUIRED_PACKAGES = [
     'avro>=1.7.7,<2.0.0',
     'crcmod>=1.7,<2.0',
     'dill==0.2.6',
+    'google-cloud-bigquery>=0.22.1,<1.0.0',
     'httplib2>=0.8,<0.10',
     'mock>=1.0.1,<3.0.0',
     'oauth2client>=2.0.1,<4.0.0',
@@ -102,6 +103,8 @@ GCP_REQUIREMENTS = [
   'google-apitools>=0.5.6,<1.0.0',
   'proto-google-cloud-datastore-v1==0.90.0',
   'googledatastore==7.0.0',
+  # GCP packages required by tests
+  'google-cloud-bigquery>=0.22.1,<0.23',
 ]
 
 


[2/2] beam git commit: This closes #2064

Posted by al...@apache.org.
This closes #2064


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de9e8528
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de9e8528
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de9e8528

Branch: refs/heads/master
Commit: de9e8528c0ed893a575f810e697b90e57b34a2cb
Parents: b322a5d dd32c26
Author: Ahmet Altay <al...@google.com>
Authored: Thu Mar 2 15:05:32 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Mar 2 15:05:32 2017 -0800

----------------------------------------------------------------------
 .../cookbook/bigquery_tornadoes_it_test.py      |  62 +++++++++++
 .../python/apache_beam/io/gcp/tests/__init__.py |  16 +++
 .../io/gcp/tests/bigquery_matcher.py            | 108 +++++++++++++++++++
 .../io/gcp/tests/bigquery_matcher_test.py       | 108 +++++++++++++++++++
 .../apache_beam/tests/pipeline_verifiers.py     |  12 +--
 sdks/python/apache_beam/tests/test_utils.py     |  12 +++
 sdks/python/setup.py                            |   3 +
 7 files changed, 313 insertions(+), 8 deletions(-)
----------------------------------------------------------------------