You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/12/08 19:23:14 UTC

[2/2] incubator-beam git commit: Fix a typo in query split error handling

Fix a typo in query split error handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6afb906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6afb906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6afb906

Branch: refs/heads/python-sdk
Commit: d6afb90690f5be2f3cb38d68dc1d49a1b551118e
Parents: 1392f70
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed Dec 7 14:19:26 2016 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Dec 8 11:22:31 2016 -0800

----------------------------------------------------------------------
 .../apache_beam/io/datastore/v1/datastoreio.py  |  2 +-
 .../io/datastore/v1/datastoreio_test.py         | 29 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
index fc3e813..a86bb0b 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
@@ -181,7 +181,7 @@ class ReadFromDatastore(PTransform):
       except Exception:
         logging.warning("Unable to parallelize the given query: %s", query,
                         exc_info=True)
-        query_splits = [(key, query)]
+        query_splits = [query]
 
       sharded_query_splits = []
       for split_query in query_splits:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
index 2ac7ffb..f80a320 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
@@ -122,6 +122,35 @@ class DatastoreioTest(unittest.TestCase):
       self.assertEqual(1, len(returned_split_queries))
       self.assertEqual(0, len(self._mock_datastore.method_calls))
 
+  def test_SplitQueryFn_with_exception(self):
+    """A test that verifies that no split is performed when failures occur."""
+    with patch.object(helper, 'get_datastore',
+                      return_value=self._mock_datastore):
+      # Force SplitQueryFn to compute the number of query splits
+      num_splits = 0
+      expected_num_splits = 1
+      entity_bytes = (expected_num_splits *
+                      ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES)
+      with patch.object(ReadFromDatastore, 'get_estimated_size_bytes',
+                        return_value=entity_bytes):
+
+        with patch.object(query_splitter, 'get_splits',
+                          side_effect=ValueError("Testing query split error")):
+          split_query_fn = ReadFromDatastore.SplitQueryFn(
+              self._PROJECT, self._query, None, num_splits)
+          mock_context = MagicMock()
+          mock_context.element = self._query
+          split_query_fn.start_bundle(mock_context)
+          returned_split_queries = []
+          for split_query in split_query_fn.process(mock_context):
+            returned_split_queries.append(split_query)
+
+          self.assertEqual(len(returned_split_queries), expected_num_splits)
+          self.assertEqual(returned_split_queries[0][1], self._query)
+          self.assertEqual(0,
+                           len(self._mock_datastore.run_query.call_args_list))
+          self.verify_unique_keys(returned_split_queries)
+
   def test_DatastoreWriteFn_with_emtpy_batch(self):
     self.check_DatastoreWriteFn(0)