You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/15 22:24:31 UTC
[1/2] incubator-beam git commit: Query Splitter for Datastore v1
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk d1fccbf5e -> 21b7844bb
Query Splitter for Datastore v1
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1126b70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1126b70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1126b70
Branch: refs/heads/python-sdk
Commit: c1126b708469fc63bd8ab8e54026700408ec34da
Parents: d1fccbf
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Mon Oct 24 18:29:29 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 15 14:24:02 2016 -0800
----------------------------------------------------------------------
.../python/apache_beam/io/datastore/__init__.py | 16 ++
.../apache_beam/io/datastore/v1/__init__.py | 16 ++
.../apache_beam/io/datastore/v1/helper.py | 84 ++++++
.../apache_beam/io/datastore/v1/helper_test.py | 124 +++++++++
.../io/datastore/v1/query_splitter.py | 270 +++++++++++++++++++
.../io/datastore/v1/query_splitter_test.py | 257 ++++++++++++++++++
sdks/python/setup.py | 1 +
7 files changed, 768 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/__init__.py b/sdks/python/apache_beam/io/datastore/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/__init__.py b/sdks/python/apache_beam/io/datastore/v1/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper.py b/sdks/python/apache_beam/io/datastore/v1/helper.py
new file mode 100644
index 0000000..626ab35
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/helper.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Cloud Datastore helper functions."""
+
+
+def key_comparator(k1, k2):
+ """A comparator for Datastore keys.
+
+ Comparison is only valid for keys in the same partition. The comparison here
+ is between the list of paths for each key.
+ """
+
+ if k1.partition_id != k2.partition_id:
+ raise ValueError('Cannot compare keys with different partition ids.')
+
+ k2_iter = iter(k2.path)
+
+ for k1_path in k1.path:
+ k2_path = next(k2_iter, None)
+ if not k2_path:
+ return 1
+
+ result = compare_path(k1_path, k2_path)
+
+ if result != 0:
+ return result
+
+ k2_path = next(k2_iter, None)
+ if k2_path:
+ return -1
+ else:
+ return 0
+
+
+def compare_path(p1, p2):
+ """A comparator for key path.
+
+ A path has either an `id` or a `name` field defined. The
+ comparison works with the following rules:
+
+ 1. If one path has `id` defined while the other doesn't, then the
+ one with `id` defined is considered smaller.
+ 2. If both paths have `id` defined, then their ids are compared.
+ 3. If no `id` is defined for both paths, then their `names` are compared.
+ """
+
+ result = str_compare(p1.kind, p2.kind)
+ if result != 0:
+ return result
+
+ if p1.HasField('id'):
+ if not p2.HasField('id'):
+ return -1
+
+ return p1.id - p2.id
+
+ if p2.HasField('id'):
+ return 1
+
+ return str_compare(p1.name, p2.name)
+
+
+def str_compare(s1, s2):
+ if s1 == s2:
+ return 0
+ elif s1 < s2:
+ return -1
+ else:
+ return 1
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
new file mode 100644
index 0000000..50f8e4c
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for datastore helper."""
+
+import unittest
+from apache_beam.io.datastore.v1 import helper
+from google.datastore.v1.entity_pb2 import Key
+
+
+class HelperTest(unittest.TestCase):
+
+ def test_compare_path_with_different_kind(self):
+ p1 = Key.PathElement()
+ p1.kind = 'dummy1'
+
+ p2 = Key.PathElement()
+ p2.kind = 'dummy2'
+
+ self.assertLess(helper.compare_path(p1, p2), 0)
+
+ def test_compare_path_with_different_id(self):
+ p1 = Key.PathElement()
+ p1.kind = 'dummy'
+ p1.id = 10
+
+ p2 = Key.PathElement()
+ p2.kind = 'dummy'
+ p2.id = 15
+
+ self.assertLess(helper.compare_path(p1, p2), 0)
+
+ def test_compare_path_with_different_name(self):
+ p1 = Key.PathElement()
+ p1.kind = 'dummy'
+ p1.name = "dummy1"
+
+ p2 = Key.PathElement()
+ p2.kind = 'dummy'
+ p2.name = 'dummy2'
+
+ self.assertLess(helper.compare_path(p1, p2), 0)
+
+ def test_compare_path_of_different_type(self):
+ p1 = Key.PathElement()
+ p1.kind = 'dummy'
+ p1.id = 10
+
+ p2 = Key.PathElement()
+ p2.kind = 'dummy'
+ p2.name = 'dummy'
+
+ self.assertLess(helper.compare_path(p1, p2), 0)
+
+ def test_key_comparator_with_different_partition(self):
+ k1 = Key()
+ k1.partition_id.namespace_id = 'dummy1'
+ k2 = Key()
+ k2.partition_id.namespace_id = 'dummy2'
+ self.assertRaises(ValueError, helper.key_comparator, k1, k2)
+
+ def test_key_comparator_with_single_path(self):
+ k1 = Key()
+ k2 = Key()
+ p1 = k1.path.add()
+ p2 = k2.path.add()
+ p1.kind = p2.kind = 'dummy'
+ self.assertEqual(helper.key_comparator(k1, k2), 0)
+
+ def test_key_comparator_with_multiple_paths_1(self):
+ k1 = Key()
+ k2 = Key()
+ p11 = k1.path.add()
+ p12 = k1.path.add()
+ p21 = k2.path.add()
+ p11.kind = p12.kind = p21.kind = 'dummy'
+ self.assertGreater(helper.key_comparator(k1, k2), 0)
+
+ def test_key_comparator_with_multiple_paths_2(self):
+ k1 = Key()
+ k2 = Key()
+ p11 = k1.path.add()
+ p21 = k2.path.add()
+ p22 = k2.path.add()
+ p11.kind = p21.kind = p22.kind = 'dummy'
+ self.assertLess(helper.key_comparator(k1, k2), 0)
+
+ def test_key_comparator_with_multiple_paths_3(self):
+ k1 = Key()
+ k2 = Key()
+ p11 = k1.path.add()
+ p12 = k1.path.add()
+ p21 = k2.path.add()
+ p22 = k2.path.add()
+ p11.kind = p12.kind = p21.kind = p22.kind = 'dummy'
+ self.assertEqual(helper.key_comparator(k1, k2), 0)
+
+ def test_key_comparator_with_multiple_paths_4(self):
+ k1 = Key()
+ k2 = Key()
+ p11 = k1.path.add()
+ p12 = k2.path.add()
+ p21 = k2.path.add()
+ p11.kind = p12.kind = 'dummy'
+ # make path2 greater than path1
+ p21.kind = 'dummy1'
+ self.assertLess(helper.key_comparator(k1, k2), 0)
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
new file mode 100644
index 0000000..82aa972
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
@@ -0,0 +1,270 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Implements a Cloud Datastore query splitter."""
+
+from apache_beam.io.datastore.v1 import helper
+from google.datastore.v1 import datastore_pb2
+from google.datastore.v1 import query_pb2
+from google.datastore.v1.query_pb2 import PropertyFilter
+from google.datastore.v1.query_pb2 import CompositeFilter
+import googledatastore
+
+
+__all__ = [
+ 'get_splits',
+]
+
+SCATTER_PROPERTY_NAME = '__scatter__'
+KEY_PROPERTY_NAME = '__key__'
+# The number of keys to sample for each split.
+KEYS_PER_SPLIT = 32
+
+UNSUPPORTED_OPERATORS = [PropertyFilter.LESS_THAN,
+ PropertyFilter.LESS_THAN_OR_EQUAL,
+ PropertyFilter.GREATER_THAN,
+ PropertyFilter.GREATER_THAN_OR_EQUAL]
+
+
+def get_splits(datastore, query, num_splits, partition=None):
+ """Returns a list of sharded queries for the given Cloud Datastore query.
+
+ This will create up to the desired number of splits, however it may return
+ less splits if the desired number of splits is unavailable. This will happen
+ if the number of split points provided by the underlying Datastore is less
+ than the desired number, which will occur if the number of results for the
+ query is too small.
+
+ This implementation of the QuerySplitter uses the __scatter__ property to
+ gather random split points for a query.
+
+ Note: This implementation is derived from the java query splitter in
+ https://github.com/GoogleCloudPlatform/google-cloud-datastore/blob/master/java/datastore/src/main/java/com/google/datastore/v1/client/QuerySplitterImpl.java
+
+ Args:
+ datastore: the datastore client.
+ query: the query to split.
+ num_splits: the desired number of splits.
+ partition: the partition the query is running in.
+
+ Returns:
+ A list of split queries, of a max length of `num_splits`
+ """
+
+ # Validate that the number of splits is not out of bounds.
+ if num_splits < 1:
+ raise ValueError('The number of splits must be greater than 0.')
+
+ if num_splits == 1:
+ return [query]
+
+ _validate_query(query)
+
+ splits = []
+ scatter_keys = _get_scatter_keys(datastore, query, num_splits, partition)
+ last_key = None
+ for next_key in _get_split_key(scatter_keys, num_splits):
+ splits.append(_create_split(last_key, next_key, query))
+ last_key = next_key
+
+ splits.append(_create_split(last_key, None, query))
+ return splits
+
+
+def _validate_query(query):
+ """ Verifies that the given query can be properly scattered."""
+
+ if len(query.kind) != 1:
+ raise ValueError('Query must have exactly one kind.')
+
+ if len(query.order) != 0:
+ raise ValueError('Query cannot have any sort orders.')
+
+ if query.HasField('limit'):
+ raise ValueError('Query cannot have a limit set.')
+
+ if query.offset > 0:
+ raise ValueError('Query cannot have an offset set.')
+
+ _validate_filter(query.filter)
+
+
+def _validate_filter(filter):
+ """Validates that we only have allowable filters.
+
+ Note that equality and ancestor filters are allowed, however they may result
+ in inefficient sharding.
+ """
+
+ if filter.HasField('composite_filter'):
+ for sub_filter in filter.composite_filter.filters:
+ _validate_filter(sub_filter)
+ elif filter.HasField('property_filter'):
+ if filter.property_filter.op in UNSUPPORTED_OPERATORS:
+ raise ValueError('Query cannot have any inequality filters.')
+ else:
+ pass
+
+
+def _create_scatter_query(query, num_splits):
+ """Creates a scatter query from the given user query."""
+
+ scatter_query = query_pb2.Query()
+ for kind in query.kind:
+ scatter_kind = scatter_query.kind.add()
+ scatter_kind.CopyFrom(kind)
+
+ # ascending order
+ googledatastore.helper.add_property_orders(scatter_query,
+ SCATTER_PROPERTY_NAME)
+
+ # There is a split containing entities before and after each scatter entity:
+ # ||---*------*------*------*------*------*------*---|| * = scatter entity
+ # If we represent each split as a region before a scatter entity, there is an
+ # extra region following the last scatter point. Thus, we do not need the
+ # scatter entity for the last region.
+ scatter_query.limit.value = (num_splits - 1) * KEYS_PER_SPLIT
+ googledatastore.helper.add_projection(scatter_query, KEY_PROPERTY_NAME)
+
+ return scatter_query
+
+
+def _get_scatter_keys(datastore, query, num_splits, partition):
+ """Gets a list of split keys given a desired number of splits.
+
+ This list will contain multiple split keys for each split. Only a single split
+ key will be chosen as the split point, however providing multiple keys allows
+ for more uniform sharding.
+
+ Args:
+ numSplits: the number of desired splits.
+ query: the user query.
+ partition: the partition to run the query in.
+ datastore: the client to datastore containing the data.
+
+ Returns:
+ A list of scatter keys returned by Datastore.
+ """
+ scatter_point_query = _create_scatter_query(query, num_splits)
+
+ key_splits = []
+ while True:
+ req = datastore_pb2.RunQueryRequest()
+ if partition:
+ req.partition_id.CopyFrom(partition)
+
+ req.query.CopyFrom(scatter_point_query)
+
+ resp = datastore.run_query(req)
+ for entity_result in resp.batch.entity_results:
+ key_splits.append(entity_result.entity.key)
+
+ if resp.batch.more_results != query_pb2.QueryResultBatch.NOT_FINISHED:
+ break
+
+ scatter_point_query.start_cursor = resp.batch.end_cursor
+ scatter_point_query.limit.value -= len(resp.batch.entity_results)
+
+ key_splits.sort(helper.key_comparator)
+ return key_splits
+
+
+def _get_split_key(keys, num_splits):
+ """Given a list of keys and a number of splits find the keys to split on.
+
+ Args:
+ keys: the list of keys.
+ num_splits: the number of splits.
+
+ Returns:
+ A list of keys to split on.
+
+ """
+
+ # If the number of keys is less than the number of splits, we are limited
+ # in the number of splits we can make.
+ if not keys or (len(keys) < (num_splits - 1)):
+ return keys
+
+ # Calculate the number of keys per split. This should be KEYS_PER_SPLIT,
+ # but may be less if there are not KEYS_PER_SPLIT * (numSplits - 1) scatter
+ # entities.
+ #
+ # Consider the following dataset, where - represents an entity and
+ # * represents an entity that is returned as a scatter entity:
+ # ||---*-----*----*-----*-----*------*----*----||
+ # If we want 4 splits in this data, the optimal split would look like:
+ # ||---*-----*----*-----*-----*------*----*----||
+ # | | |
+ # The scatter keys in the last region are not useful to us, so we never
+ # request them:
+ # ||---*-----*----*-----*-----*------*---------||
+ # | | |
+ # With 6 scatter keys we want to set scatter points at indexes: 1, 3, 5.
+ #
+ # We keep this as a float so that any "fractional" keys per split get
+ # distributed throughout the splits and don't make the last split
+ # significantly larger than the rest.
+
+ num_keys_per_split = max(1.0, float(len(keys)) / (num_splits - 1))
+
+ split_keys = []
+
+ # Grab the last sample for each split, otherwise the first split will be too
+ # small.
+ for i in range(1, num_splits):
+ split_index = int(round(i * num_keys_per_split) - 1)
+ split_keys.append(keys[split_index])
+
+ return split_keys
+
+
+def _create_split(last_key, next_key, query):
+ """Create a new {@link Query} given the query and range..
+
+ Args:
+ last_key: the previous key. If null then assumed to be the beginning.
+ next_key: the next key. If null then assumed to be the end.
+ query: the desired query.
+
+ Returns:
+ A split query with fetches entities in the range [last_key, next_key)
+ """
+ if not (last_key or next_key):
+ return query
+
+ split_query = query_pb2.Query()
+ split_query.CopyFrom(query)
+ composite_filter = split_query.filter.composite_filter
+ composite_filter.op = CompositeFilter.AND
+
+ if query.HasField('filter'):
+ composite_filter.filters.add().CopyFrom(query.filter)
+
+ if last_key:
+ lower_bound = composite_filter.filters.add()
+ lower_bound.property_filter.property.name = KEY_PROPERTY_NAME
+ lower_bound.property_filter.op = PropertyFilter.GREATER_THAN_OR_EQUAL
+ lower_bound.property_filter.value.key_value.CopyFrom(last_key)
+
+ if next_key:
+ upper_bound = composite_filter.filters.add()
+ upper_bound.property_filter.property.name = KEY_PROPERTY_NAME
+ upper_bound.property_filter.op = PropertyFilter.LESS_THAN
+ upper_bound.property_filter.value.key_value.CopyFrom(next_key)
+
+ return split_query
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py b/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py
new file mode 100644
index 0000000..979a69f
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py
@@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Cloud Datastore query splitter test."""
+
+import unittest
+import uuid
+
+from mock import MagicMock
+from mock import call
+
+from apache_beam.io.datastore.v1 import query_splitter
+
+from google.datastore.v1 import datastore_pb2
+from google.datastore.v1 import query_pb2
+from google.datastore.v1.query_pb2 import PropertyFilter
+
+
+class QuerySplitterTest(unittest.TestCase):
+
+ def test_get_splits_query_with_multiple_kinds(self):
+ query = query_pb2.Query()
+ query.kind.add()
+ query.kind.add()
+ self.assertRaises(ValueError, query_splitter.get_splits, None, query, 4)
+
+ def test_get_splits_query_with_order(self):
+ query = query_pb2.Query()
+ query.kind.add()
+ query.order.add()
+
+ self.assertRaises(ValueError, query_splitter.get_splits, None, query, 3)
+
+ def test_get_splits_query_with_unsupported_filter(self):
+ query = query_pb2.Query()
+ query.kind.add()
+ test_filter = query.filter.composite_filter.filters.add()
+ test_filter.property_filter.op = PropertyFilter.GREATER_THAN
+ self.assertRaises(ValueError, query_splitter.get_splits, None, query, 2)
+
+ def test_get_splits_query_with_limit(self):
+ query = query_pb2.Query()
+ query.kind.add()
+ query.limit.value = 10
+ self.assertRaises(ValueError, query_splitter.get_splits, None, query, 2)
+
+ def test_get_splits_query_with_offset(self):
+ query = query_pb2.Query()
+ query.kind.add()
+ query.offset = 10
+ self.assertRaises(ValueError, query_splitter.get_splits, None, query, 2)
+
+ def test_create_scatter_query(self):
+ query = query_pb2.Query()
+ kind = query.kind.add()
+ kind.name = 'shakespeare-demo'
+ num_splits = 10
+ scatter_query = query_splitter._create_scatter_query(query, num_splits)
+ self.assertEqual(scatter_query.kind[0], kind)
+ self.assertEqual(scatter_query.limit.value,
+ (num_splits -1) * query_splitter.KEYS_PER_SPLIT)
+ self.assertEqual(scatter_query.order[0].direction,
+ query_pb2.PropertyOrder.ASCENDING)
+ self.assertEqual(scatter_query.projection[0].property.name,
+ query_splitter.KEY_PROPERTY_NAME)
+
+ def test_get_splits_with_two_splits(self):
+ query = query_pb2.Query()
+ kind = query.kind.add()
+ kind.name = 'shakespeare-demo'
+ num_splits = 2
+ num_entities = 97
+ batch_size = 9
+
+ self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+ def test_get_splits_with_multiple_splits(self):
+ query = query_pb2.Query()
+ kind = query.kind.add()
+ kind.name = 'shakespeare-demo'
+ num_splits = 4
+ num_entities = 369
+ batch_size = 12
+
+ self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+ def test_get_splits_with_large_num_splits(self):
+ query = query_pb2.Query()
+ kind = query.kind.add()
+ kind.name = 'shakespeare-demo'
+ num_splits = 10
+ num_entities = 4
+ batch_size = 10
+
+ self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+ def test_get_splits_with_small_num_entities(self):
+ query = query_pb2.Query()
+ kind = query.kind.add()
+ kind.name = 'shakespeare-demo'
+ num_splits = 4
+ num_entities = 50
+ batch_size = 10
+
+ self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+ def test_get_splits_with_batch_size_exact_multiple(self):
+ """Test get_splits when num scatter keys is a multiple of batch size."""
+ query = query_pb2.Query()
+ kind = query.kind.add()
+ kind.name = 'shakespeare-demo'
+ num_splits = 4
+ num_entities = 400
+ batch_size = 32
+
+ self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+ def test_get_splits_with_large_batch_size(self):
+ """Test get_splits when all scatter keys are retured in a single req."""
+ query = query_pb2.Query()
+ kind = query.kind.add()
+ kind.name = 'shakespeare-demo'
+ num_splits = 4
+ num_entities = 400
+ batch_size = 500
+
+ self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+ def check_get_splits(self, query, num_splits, num_entities, batch_size):
+ """A helper method to test the query_splitter get_splits method.
+
+ Args:
+ query: the query to be split
+ num_splits: number of splits
+ num_entities: number of scatter entities contained in the fake datastore.
+ batch_size: the number of entities returned by fake datastore in one req.
+ """
+
+ entities = QuerySplitterTest.create_entities(num_entities)
+ mock_datastore = MagicMock()
+ # Assign a fake run_query method as a side_effect to the mock.
+ mock_datastore.run_query.side_effect = \
+ QuerySplitterTest.create_run_query(entities, batch_size)
+
+ split_queries = query_splitter.get_splits(mock_datastore, query, num_splits)
+
+ # if request num_splits is greater than num_entities, the best it can
+ # do is one entity per split.
+ expected_num_splits = min(num_splits, num_entities + 1)
+ self.assertEqual(len(split_queries), expected_num_splits)
+
+ expected_requests = QuerySplitterTest.create_scatter_requests(
+ query, num_splits, batch_size, num_entities)
+
+ expected_calls = []
+ for req in expected_requests:
+ expected_calls.append(call(req))
+
+ self.assertEqual(expected_calls, mock_datastore.run_query.call_args_list)
+
+ @staticmethod
+ def create_run_query(entities, batch_size):
+ """A fake datastore run_query method that returns entities in batches.
+
+ Note: the outer method is needed to make the `entities` and `batch_size`
+ available in the scope of fake_run_query method.
+
+ Args:
+ entities: list of entities supposed to be contained in the datastore.
+ batch_size: the number of entities that run_query method returns in one
+ request.
+ """
+ def fake_run_query(req):
+ start = int(req.query.start_cursor) if req.query.start_cursor else 0
+ # if query limit is less than batch_size, then only return that much.
+ count = min(batch_size, req.query.limit.value)
+ # cannot go more than the number of entities contained in datastore.
+ end = min(len(entities), start + count)
+ finish = False
+ # Finish reading when there are no more entities to return,
+ # or request query limit has been satisfied.
+ if end == len(entities) or count == req.query.limit.value:
+ finish = True
+ return QuerySplitterTest.create_scatter_response(entities[start:end],
+ str(end), finish)
+ return fake_run_query
+
+ @staticmethod
+ def create_scatter_requests(query, num_splits, batch_size, num_entities):
+ """Creates a list of expected scatter requests from the query splitter.
+
+ This list of requests returned is used to verify that the query splitter
+ made the same number of requests in the same order to datastore.
+ """
+
+ requests = []
+ count = (num_splits - 1) * query_splitter.KEYS_PER_SPLIT
+ start_cursor = ''
+ i = 0
+ scatter_query = query_splitter._create_scatter_query(query, count)
+ while i < count and i < num_entities:
+ request = datastore_pb2.RunQueryRequest()
+ request.query.CopyFrom(scatter_query)
+ request.query.start_cursor = start_cursor
+ request.query.limit.value = count - i
+ requests.append(request)
+ i += batch_size
+ start_cursor = str(i)
+
+ return requests
+
+ @staticmethod
+ def create_scatter_response(entities, end_cursor, finish):
+ """Creates a query response for a given batch of scatter entities."""
+
+ resp = datastore_pb2.RunQueryResponse()
+ if finish:
+ resp.batch.more_results = query_pb2.QueryResultBatch.NO_MORE_RESULTS
+ else:
+ resp.batch.more_results = query_pb2.QueryResultBatch.NOT_FINISHED
+
+ resp.batch.end_cursor = end_cursor
+ for entity_result in entities:
+ resp.batch.entity_results.add().CopyFrom(entity_result)
+
+ return resp
+
+ @staticmethod
+ def create_entities(count):
+ """Creates a list of entities with random keys."""
+
+ entities = []
+
+ for _ in range(0, count):
+ entity_result = query_pb2.EntityResult()
+ entity_result.entity.key.path.add().name = str(uuid.uuid4())
+ entities.append(entity_result)
+
+ return entities
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index c7b940d..af59069 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -87,6 +87,7 @@ REQUIRED_PACKAGES = [
'avro>=1.7.7,<2.0.0',
'dill>=0.2.5,<0.3',
'google-apitools>=0.5.2,<1.0.0',
+ 'googledatastore==6.3.0',
'httplib2>=0.8,<0.10',
'mock>=1.0.1,<3.0.0',
'oauth2client>=2.0.1,<4.0.0',
[2/2] incubator-beam git commit: Closes #1310
Posted by ro...@apache.org.
Closes #1310
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/21b7844b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/21b7844b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/21b7844b
Branch: refs/heads/python-sdk
Commit: 21b7844bb05e9a86531876cffe8ee776bfaaa1cc
Parents: d1fccbf c1126b7
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 15 14:24:03 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 15 14:24:03 2016 -0800
----------------------------------------------------------------------
.../python/apache_beam/io/datastore/__init__.py | 16 ++
.../apache_beam/io/datastore/v1/__init__.py | 16 ++
.../apache_beam/io/datastore/v1/helper.py | 84 ++++++
.../apache_beam/io/datastore/v1/helper_test.py | 124 +++++++++
.../io/datastore/v1/query_splitter.py | 270 +++++++++++++++++++
.../io/datastore/v1/query_splitter_test.py | 257 ++++++++++++++++++
sdks/python/setup.py | 1 +
7 files changed, 768 insertions(+)
----------------------------------------------------------------------