You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/12/08 19:23:13 UTC
[1/2] incubator-beam git commit: Closes #1542
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 1392f70b6 -> 44c1586f3
Closes #1542
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44c1586f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44c1586f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44c1586f
Branch: refs/heads/python-sdk
Commit: 44c1586f3815f957a43037309f2a46a1766c6516
Parents: 1392f70 d6afb90
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Dec 8 11:22:31 2016 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Dec 8 11:22:31 2016 -0800
----------------------------------------------------------------------
.../apache_beam/io/datastore/v1/datastoreio.py | 2 +-
.../io/datastore/v1/datastoreio_test.py | 29 ++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Fix a typo in query split error
handling
Posted by ro...@apache.org.
Fix a typo in query split error handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6afb906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6afb906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6afb906
Branch: refs/heads/python-sdk
Commit: d6afb90690f5be2f3cb38d68dc1d49a1b551118e
Parents: 1392f70
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed Dec 7 14:19:26 2016 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Dec 8 11:22:31 2016 -0800
----------------------------------------------------------------------
.../apache_beam/io/datastore/v1/datastoreio.py | 2 +-
.../io/datastore/v1/datastoreio_test.py | 29 ++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
index fc3e813..a86bb0b 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
@@ -181,7 +181,7 @@ class ReadFromDatastore(PTransform):
except Exception:
logging.warning("Unable to parallelize the given query: %s", query,
exc_info=True)
- query_splits = [(key, query)]
+ query_splits = [query]
sharded_query_splits = []
for split_query in query_splits:
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
index 2ac7ffb..f80a320 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
@@ -122,6 +122,35 @@ class DatastoreioTest(unittest.TestCase):
self.assertEqual(1, len(returned_split_queries))
self.assertEqual(0, len(self._mock_datastore.method_calls))
+ def test_SplitQueryFn_with_exception(self):
+ """A test that verifies that no split is performed when failures occur."""
+ with patch.object(helper, 'get_datastore',
+ return_value=self._mock_datastore):
+ # Force SplitQueryFn to compute the number of query splits
+ num_splits = 0
+ expected_num_splits = 1
+ entity_bytes = (expected_num_splits *
+ ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES)
+ with patch.object(ReadFromDatastore, 'get_estimated_size_bytes',
+ return_value=entity_bytes):
+
+ with patch.object(query_splitter, 'get_splits',
+ side_effect=ValueError("Testing query split error")):
+ split_query_fn = ReadFromDatastore.SplitQueryFn(
+ self._PROJECT, self._query, None, num_splits)
+ mock_context = MagicMock()
+ mock_context.element = self._query
+ split_query_fn.start_bundle(mock_context)
+ returned_split_queries = []
+ for split_query in split_query_fn.process(mock_context):
+ returned_split_queries.append(split_query)
+
+ self.assertEqual(len(returned_split_queries), expected_num_splits)
+ self.assertEqual(returned_split_queries[0][1], self._query)
+ self.assertEqual(0,
+ len(self._mock_datastore.run_query.call_args_list))
+ self.verify_unique_keys(returned_split_queries)
+
def test_DatastoreWriteFn_with_emtpy_batch(self):
self.check_DatastoreWriteFn(0)