You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/04 16:14:13 UTC
[1/2] beam git commit: [BEAM-1584] Add clean up in bigquery
integration test
Repository: beam
Updated Branches:
refs/heads/master 72bd73ab3 -> f731de0b3
[BEAM-1584] Add clean up in bigquery integration test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fc00ce90
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fc00ce90
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fc00ce90
Branch: refs/heads/master
Commit: fc00ce90060ccd99b69ff7225e3e35d259d3c7ad
Parents: 72bd73a
Author: Mark Liu <ma...@google.com>
Authored: Tue Aug 1 17:46:05 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 4 09:13:49 2017 -0700
----------------------------------------------------------------------
.../cookbook/bigquery_tornadoes_it_test.py | 14 +++-
sdks/python/apache_beam/io/gcp/tests/utils.py | 63 ++++++++++++++++++
.../apache_beam/io/gcp/tests/utils_test.py | 70 ++++++++++++++++++++
3 files changed, 144 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fc00ce90/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
index 5d2ee7c..05ee3c5 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -26,6 +26,7 @@ from nose.plugins.attrib import attr
from apache_beam.examples.cookbook import bigquery_tornadoes
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
+from apache_beam.io.gcp.tests import utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
@@ -44,17 +45,24 @@ class BigqueryTornadoesIT(unittest.TestCase):
test_pipeline = TestPipeline(is_integration_test=True)
# Set extra options to the pipeline for test purpose
- output_table = ('BigQueryTornadoesIT'
- '.monthly_tornadoes_%s' % int(round(time.time() * 1000)))
+ project = test_pipeline.get_option('project')
+
+ dataset = 'BigQueryTornadoesIT'
+ table = 'monthly_tornadoes_%s' % int(round(time.time() * 1000))
+ output_table = '.'.join([dataset, table])
query = 'SELECT month, tornado_count FROM [%s]' % output_table
+
pipeline_verifiers = [PipelineStateMatcher(),
BigqueryMatcher(
- project=test_pipeline.get_option('project'),
+ project=project,
query=query,
checksum=self.DEFAULT_CHECKSUM)]
extra_opts = {'output': output_table,
'on_success_matcher': all_of(*pipeline_verifiers)}
+ # Register cleanup before pipeline execution.
+ self.addCleanup(utils.delete_bq_table, project, dataset, table)
+
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
bigquery_tornadoes.run(
http://git-wip-us.apache.org/repos/asf/beam/blob/fc00ce90/sdks/python/apache_beam/io/gcp/tests/utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/utils.py b/sdks/python/apache_beam/io/gcp/tests/utils.py
new file mode 100644
index 0000000..40eb975
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/utils.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+"""Utility methods for testing on GCP."""
+
+import logging
+
+from apache_beam.utils import retry
+
+# Protect against environments where bigquery library is not available.
+try:
+ from google.cloud import bigquery
+except ImportError:
+ bigquery = None
+
+
+class GcpTestIOError(retry.PermanentException):
+ """Basic GCP IO error for testing. Function that raises this error should
+ not be retried."""
+ pass
+
+
+@retry.with_exponential_backoff(
+ num_retries=3,
+ retry_filter=retry.retry_on_server_errors_filter)
+def delete_bq_table(project, dataset, table):
+ """Delete a Biqquery table.
+
+ Args:
+ project: Name of the project.
+ dataset: Name of the dataset where table is.
+ table: Name of the table.
+ """
+ logging.info('Clean up a Bigquery table with project: %s, dataset: %s, '
+ 'table: %s.', project, dataset, table)
+ bq_dataset = bigquery.Client(project=project).dataset(dataset)
+ if not bq_dataset.exists():
+ raise GcpTestIOError('Failed to cleanup. Bigquery dataset %s doesn\'t'
+ 'exist in project %s.' % dataset, project)
+ bq_table = bq_dataset.table(table)
+ if not bq_table.exists():
+ raise GcpTestIOError('Failed to cleanup. Biqeury table %s doesn\'t '
+ 'exist in project %s, dataset %s.' %
+ table, project, dataset)
+ bq_table.delete()
+ if bq_table.exists():
+ raise RuntimeError('Failed to cleanup. Bigquery table %s still exists '
+ 'after cleanup.' % table)
http://git-wip-us.apache.org/repos/asf/beam/blob/fc00ce90/sdks/python/apache_beam/io/gcp/tests/utils_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/utils_test.py b/sdks/python/apache_beam/io/gcp/tests/utils_test.py
new file mode 100644
index 0000000..270750a
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/utils_test.py
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for GCP testing utils."""
+
+import logging
+import unittest
+from mock import Mock, patch
+
+from apache_beam.io.gcp.tests import utils
+from apache_beam.testing.test_utils import patch_retry
+
+# Protect against environments where bigquery library is not available.
+try:
+ from google.cloud import bigquery
+except ImportError:
+ bigquery = None
+
+
+@unittest.skipIf(bigquery is None, 'Bigquery dependencies are not installed.')
+class UtilsTest(unittest.TestCase):
+
+ def setUp(self):
+ self._mock_result = Mock()
+ patch_retry(self, utils)
+
+ @patch('google.cloud.bigquery.Table.delete')
+ @patch('google.cloud.bigquery.Table.exists', side_effect=[True, False])
+ @patch('google.cloud.bigquery.Dataset.exists', return_value=True)
+ def test_delete_bq_table_succeeds(self, *_):
+ utils.delete_bq_table('unused_project',
+ 'unused_dataset',
+ 'unused_table')
+
+ @patch('google.cloud.bigquery.Table.delete', side_effect=Exception)
+ @patch('google.cloud.bigquery.Table.exists', return_value=True)
+ @patch('google.cloud.bigquery.Dataset.exists', return_vaue=True)
+ def test_delete_bq_table_fails_with_server_error(self, *_):
+ with self.assertRaises(Exception):
+ utils.delete_bq_table('unused_project',
+ 'unused_dataset',
+ 'unused_table')
+
+ @patch('google.cloud.bigquery.Table.delete')
+ @patch('google.cloud.bigquery.Table.exists', return_value=[True, True])
+ @patch('google.cloud.bigquery.Dataset.exists', return_vaue=True)
+ def test_delete_bq_table_fails_with_delete_error(self, *_):
+ with self.assertRaises(RuntimeError):
+ utils.delete_bq_table('unused_project',
+ 'unused_dataset',
+ 'unused_table')
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
[2/2] beam git commit: This closes #3673
Posted by al...@apache.org.
This closes #3673
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f731de0b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f731de0b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f731de0b
Branch: refs/heads/master
Commit: f731de0b357bc5eccee565271ade96e1946bf0bd
Parents: 72bd73a fc00ce9
Author: Ahmet Altay <al...@google.com>
Authored: Fri Aug 4 09:13:54 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 4 09:13:54 2017 -0700
----------------------------------------------------------------------
.../cookbook/bigquery_tornadoes_it_test.py | 14 +++-
sdks/python/apache_beam/io/gcp/tests/utils.py | 63 ++++++++++++++++++
.../apache_beam/io/gcp/tests/utils_test.py | 70 ++++++++++++++++++++
3 files changed, 144 insertions(+), 3 deletions(-)
----------------------------------------------------------------------