You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/15 22:24:31 UTC

[1/2] incubator-beam git commit: Query Splitter for Datastore v1

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk d1fccbf5e -> 21b7844bb


Query Splitter for Datastore v1


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1126b70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1126b70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1126b70

Branch: refs/heads/python-sdk
Commit: c1126b708469fc63bd8ab8e54026700408ec34da
Parents: d1fccbf
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Mon Oct 24 18:29:29 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 15 14:24:02 2016 -0800

----------------------------------------------------------------------
 .../python/apache_beam/io/datastore/__init__.py |  16 ++
 .../apache_beam/io/datastore/v1/__init__.py     |  16 ++
 .../apache_beam/io/datastore/v1/helper.py       |  84 ++++++
 .../apache_beam/io/datastore/v1/helper_test.py  | 124 +++++++++
 .../io/datastore/v1/query_splitter.py           | 270 +++++++++++++++++++
 .../io/datastore/v1/query_splitter_test.py      | 257 ++++++++++++++++++
 sdks/python/setup.py                            |   1 +
 7 files changed, 768 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/__init__.py b/sdks/python/apache_beam/io/datastore/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/__init__.py b/sdks/python/apache_beam/io/datastore/v1/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper.py b/sdks/python/apache_beam/io/datastore/v1/helper.py
new file mode 100644
index 0000000..626ab35
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/helper.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Cloud Datastore helper functions."""
+
+
+def key_comparator(k1, k2):
+  """A comparator for Datastore keys.
+
+  Comparison is only valid for keys in the same partition. The comparison here
+  is between the list of paths for each key.
+  """
+
+  if k1.partition_id != k2.partition_id:
+    raise ValueError('Cannot compare keys with different partition ids.')
+
+  k2_iter = iter(k2.path)
+
+  for k1_path in k1.path:
+    k2_path = next(k2_iter, None)
+    if not k2_path:
+      return 1
+
+    result = compare_path(k1_path, k2_path)
+
+    if result != 0:
+      return result
+
+  k2_path = next(k2_iter, None)
+  if k2_path:
+    return -1
+  else:
+    return 0
+
+
+def compare_path(p1, p2):
+  """A comparator for key path.
+
+  A path has either an `id` or a `name` field defined. The
+  comparison works with the following rules:
+
+  1. If one path has `id` defined while the other doesn't, then the
+  one with `id` defined is considered smaller.
+  2. If both paths have `id` defined, then their ids are compared.
+  3. If no `id` is defined for both paths, then their `names` are compared.
+  """
+
+  result = str_compare(p1.kind, p2.kind)
+  if result != 0:
+    return result
+
+  if p1.HasField('id'):
+    if not p2.HasField('id'):
+      return -1
+
+    return p1.id - p2.id
+
+  if p2.HasField('id'):
+    return 1
+
+  return str_compare(p1.name, p2.name)
+
+
+def str_compare(s1, s2):
+  if s1 == s2:
+    return 0
+  elif s1 < s2:
+    return -1
+  else:
+    return 1

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
new file mode 100644
index 0000000..50f8e4c
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for datastore helper."""
+
+import unittest
+from apache_beam.io.datastore.v1 import helper
+from google.datastore.v1.entity_pb2 import Key
+
+
+class HelperTest(unittest.TestCase):
+
+  def test_compare_path_with_different_kind(self):
+    p1 = Key.PathElement()
+    p1.kind = 'dummy1'
+
+    p2 = Key.PathElement()
+    p2.kind = 'dummy2'
+
+    self.assertLess(helper.compare_path(p1, p2), 0)
+
+  def test_compare_path_with_different_id(self):
+    p1 = Key.PathElement()
+    p1.kind = 'dummy'
+    p1.id = 10
+
+    p2 = Key.PathElement()
+    p2.kind = 'dummy'
+    p2.id = 15
+
+    self.assertLess(helper.compare_path(p1, p2), 0)
+
+  def test_compare_path_with_different_name(self):
+    p1 = Key.PathElement()
+    p1.kind = 'dummy'
+    p1.name = "dummy1"
+
+    p2 = Key.PathElement()
+    p2.kind = 'dummy'
+    p2.name = 'dummy2'
+
+    self.assertLess(helper.compare_path(p1, p2), 0)
+
+  def test_compare_path_of_different_type(self):
+    p1 = Key.PathElement()
+    p1.kind = 'dummy'
+    p1.id = 10
+
+    p2 = Key.PathElement()
+    p2.kind = 'dummy'
+    p2.name = 'dummy'
+
+    self.assertLess(helper.compare_path(p1, p2), 0)
+
+  def test_key_comparator_with_different_partition(self):
+    k1 = Key()
+    k1.partition_id.namespace_id = 'dummy1'
+    k2 = Key()
+    k2.partition_id.namespace_id = 'dummy2'
+    self.assertRaises(ValueError, helper.key_comparator, k1, k2)
+
+  def test_key_comparator_with_single_path(self):
+    k1 = Key()
+    k2 = Key()
+    p1 = k1.path.add()
+    p2 = k2.path.add()
+    p1.kind = p2.kind = 'dummy'
+    self.assertEqual(helper.key_comparator(k1, k2), 0)
+
+  def test_key_comparator_with_multiple_paths_1(self):
+    k1 = Key()
+    k2 = Key()
+    p11 = k1.path.add()
+    p12 = k1.path.add()
+    p21 = k2.path.add()
+    p11.kind = p12.kind = p21.kind = 'dummy'
+    self.assertGreater(helper.key_comparator(k1, k2), 0)
+
+  def test_key_comparator_with_multiple_paths_2(self):
+    k1 = Key()
+    k2 = Key()
+    p11 = k1.path.add()
+    p21 = k2.path.add()
+    p22 = k2.path.add()
+    p11.kind = p21.kind = p22.kind = 'dummy'
+    self.assertLess(helper.key_comparator(k1, k2), 0)
+
+  def test_key_comparator_with_multiple_paths_3(self):
+    k1 = Key()
+    k2 = Key()
+    p11 = k1.path.add()
+    p12 = k1.path.add()
+    p21 = k2.path.add()
+    p22 = k2.path.add()
+    p11.kind = p12.kind = p21.kind = p22.kind = 'dummy'
+    self.assertEqual(helper.key_comparator(k1, k2), 0)
+
+  def test_key_comparator_with_multiple_paths_4(self):
+    k1 = Key()
+    k2 = Key()
+    p11 = k1.path.add()
+    p12 = k2.path.add()
+    p21 = k2.path.add()
+    p11.kind = p12.kind = 'dummy'
+    # make path2 greater than path1
+    p21.kind = 'dummy1'
+    self.assertLess(helper.key_comparator(k1, k2), 0)
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
new file mode 100644
index 0000000..82aa972
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
@@ -0,0 +1,270 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Implements a Cloud Datastore query splitter."""
+
+from apache_beam.io.datastore.v1 import helper
+from google.datastore.v1 import datastore_pb2
+from google.datastore.v1 import query_pb2
+from google.datastore.v1.query_pb2 import PropertyFilter
+from google.datastore.v1.query_pb2 import CompositeFilter
+import googledatastore
+
+
+__all__ = [
+    'get_splits',
+]
+
+SCATTER_PROPERTY_NAME = '__scatter__'
+KEY_PROPERTY_NAME = '__key__'
+# The number of keys to sample for each split.
+KEYS_PER_SPLIT = 32
+
+UNSUPPORTED_OPERATORS = [PropertyFilter.LESS_THAN,
+                         PropertyFilter.LESS_THAN_OR_EQUAL,
+                         PropertyFilter.GREATER_THAN,
+                         PropertyFilter.GREATER_THAN_OR_EQUAL]
+
+
+def get_splits(datastore, query, num_splits, partition=None):
+  """Returns a list of sharded queries for the given Cloud Datastore query.
+
+  This will create up to the desired number of splits, however it may return
+  less splits if the desired number of splits is unavailable. This will happen
+  if the number of split points provided by the underlying Datastore is less
+  than the desired number, which will occur if the number of results for the
+  query is too small.
+
+  This implementation of the QuerySplitter uses the __scatter__ property to
+  gather random split points for a query.
+
+  Note: This implementation is derived from the java query splitter in
+  https://github.com/GoogleCloudPlatform/google-cloud-datastore/blob/master/java/datastore/src/main/java/com/google/datastore/v1/client/QuerySplitterImpl.java
+
+  Args:
+    datastore: the datastore client.
+    query: the query to split.
+    num_splits: the desired number of splits.
+    partition: the partition the query is running in.
+
+  Returns:
+    A list of split queries, of a max length of `num_splits`
+  """
+
+  # Validate that the number of splits is not out of bounds.
+  if num_splits < 1:
+    raise ValueError('The number of splits must be greater than 0.')
+
+  if num_splits == 1:
+    return [query]
+
+  _validate_query(query)
+
+  splits = []
+  scatter_keys = _get_scatter_keys(datastore, query, num_splits, partition)
+  last_key = None
+  for next_key in _get_split_key(scatter_keys, num_splits):
+    splits.append(_create_split(last_key, next_key, query))
+    last_key = next_key
+
+  splits.append(_create_split(last_key, None, query))
+  return splits
+
+
+def _validate_query(query):
+  """ Verifies that the given query can be properly scattered."""
+
+  if len(query.kind) != 1:
+    raise ValueError('Query must have exactly one kind.')
+
+  if len(query.order) != 0:
+    raise ValueError('Query cannot have any sort orders.')
+
+  if query.HasField('limit'):
+    raise ValueError('Query cannot have a limit set.')
+
+  if query.offset > 0:
+    raise ValueError('Query cannot have an offset set.')
+
+  _validate_filter(query.filter)
+
+
+def _validate_filter(filter):
+  """Validates that we only have allowable filters.
+
+  Note that equality and ancestor filters are allowed, however they may result
+  in inefficient sharding.
+  """
+
+  if filter.HasField('composite_filter'):
+    for sub_filter in filter.composite_filter.filters:
+      _validate_filter(sub_filter)
+  elif filter.HasField('property_filter'):
+    if filter.property_filter.op in UNSUPPORTED_OPERATORS:
+      raise ValueError('Query cannot have any inequality filters.')
+  else:
+    pass
+
+
+def _create_scatter_query(query, num_splits):
+  """Creates a scatter query from the given user query."""
+
+  scatter_query = query_pb2.Query()
+  for kind in query.kind:
+    scatter_kind = scatter_query.kind.add()
+    scatter_kind.CopyFrom(kind)
+
+  # ascending order
+  googledatastore.helper.add_property_orders(scatter_query,
+                                             SCATTER_PROPERTY_NAME)
+
+  # There is a split containing entities before and after each scatter entity:
+  # ||---*------*------*------*------*------*------*---||  * = scatter entity
+  # If we represent each split as a region before a scatter entity, there is an
+  # extra region following the last scatter point. Thus, we do not need the
+  # scatter entity for the last region.
+  scatter_query.limit.value = (num_splits - 1) * KEYS_PER_SPLIT
+  googledatastore.helper.add_projection(scatter_query, KEY_PROPERTY_NAME)
+
+  return scatter_query
+
+
+def _get_scatter_keys(datastore, query, num_splits, partition):
+  """Gets a list of split keys given a desired number of splits.
+
+  This list will contain multiple split keys for each split. Only a single split
+  key will be chosen as the split point, however providing multiple keys allows
+  for more uniform sharding.
+
+  Args:
+    numSplits: the number of desired splits.
+    query: the user query.
+    partition: the partition to run the query in.
+    datastore: the client to datastore containing the data.
+
+  Returns:
+    A list of scatter keys returned by Datastore.
+  """
+  scatter_point_query = _create_scatter_query(query, num_splits)
+
+  key_splits = []
+  while True:
+    req = datastore_pb2.RunQueryRequest()
+    if partition:
+      req.partition_id.CopyFrom(partition)
+
+    req.query.CopyFrom(scatter_point_query)
+
+    resp = datastore.run_query(req)
+    for entity_result in resp.batch.entity_results:
+      key_splits.append(entity_result.entity.key)
+
+    if resp.batch.more_results != query_pb2.QueryResultBatch.NOT_FINISHED:
+      break
+
+    scatter_point_query.start_cursor = resp.batch.end_cursor
+    scatter_point_query.limit.value -= len(resp.batch.entity_results)
+
+  key_splits.sort(helper.key_comparator)
+  return key_splits
+
+
+def _get_split_key(keys, num_splits):
+  """Given a list of keys and a number of splits find the keys to split on.
+
+  Args:
+    keys: the list of keys.
+    num_splits: the number of splits.
+
+  Returns:
+    A list of keys to split on.
+
+  """
+
+  # If the number of keys is less than the number of splits, we are limited
+  # in the number of splits we can make.
+  if not keys or (len(keys) < (num_splits - 1)):
+    return keys
+
+  # Calculate the number of keys per split. This should be KEYS_PER_SPLIT,
+  # but may be less if there are not KEYS_PER_SPLIT * (numSplits - 1) scatter
+  # entities.
+  #
+  # Consider the following dataset, where - represents an entity and
+  # * represents an entity that is returned as a scatter entity:
+  # ||---*-----*----*-----*-----*------*----*----||
+  # If we want 4 splits in this data, the optimal split would look like:
+  # ||---*-----*----*-----*-----*------*----*----||
+  #            |          |            |
+  # The scatter keys in the last region are not useful to us, so we never
+  # request them:
+  # ||---*-----*----*-----*-----*------*---------||
+  #            |          |            |
+  # With 6 scatter keys we want to set scatter points at indexes: 1, 3, 5.
+  #
+  # We keep this as a float so that any "fractional" keys per split get
+  # distributed throughout the splits and don't make the last split
+  # significantly larger than the rest.
+
+  num_keys_per_split = max(1.0, float(len(keys)) / (num_splits - 1))
+
+  split_keys = []
+
+  # Grab the last sample for each split, otherwise the first split will be too
+  # small.
+  for i in range(1, num_splits):
+    split_index = int(round(i * num_keys_per_split) - 1)
+    split_keys.append(keys[split_index])
+
+  return split_keys
+
+
+def _create_split(last_key, next_key, query):
+  """Create a new {@link Query} given the query and range..
+
+  Args:
+    last_key: the previous key. If null then assumed to be the beginning.
+    next_key: the next key. If null then assumed to be the end.
+    query: the desired query.
+
+  Returns:
+    A split query with fetches entities in the range [last_key, next_key)
+  """
+  if not (last_key or next_key):
+    return query
+
+  split_query = query_pb2.Query()
+  split_query.CopyFrom(query)
+  composite_filter = split_query.filter.composite_filter
+  composite_filter.op = CompositeFilter.AND
+
+  if query.HasField('filter'):
+    composite_filter.filters.add().CopyFrom(query.filter)
+
+  if last_key:
+    lower_bound = composite_filter.filters.add()
+    lower_bound.property_filter.property.name = KEY_PROPERTY_NAME
+    lower_bound.property_filter.op = PropertyFilter.GREATER_THAN_OR_EQUAL
+    lower_bound.property_filter.value.key_value.CopyFrom(last_key)
+
+  if next_key:
+    upper_bound = composite_filter.filters.add()
+    upper_bound.property_filter.property.name = KEY_PROPERTY_NAME
+    upper_bound.property_filter.op = PropertyFilter.LESS_THAN
+    upper_bound.property_filter.value.key_value.CopyFrom(next_key)
+
+  return split_query

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py b/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py
new file mode 100644
index 0000000..979a69f
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/query_splitter_test.py
@@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Cloud Datastore query splitter test."""
+
+import unittest
+import uuid
+
+from mock import MagicMock
+from mock import call
+
+from apache_beam.io.datastore.v1 import query_splitter
+
+from google.datastore.v1 import datastore_pb2
+from google.datastore.v1 import query_pb2
+from google.datastore.v1.query_pb2 import PropertyFilter
+
+
+class QuerySplitterTest(unittest.TestCase):
+
+  def test_get_splits_query_with_multiple_kinds(self):
+    query = query_pb2.Query()
+    query.kind.add()
+    query.kind.add()
+    self.assertRaises(ValueError, query_splitter.get_splits, None, query, 4)
+
+  def test_get_splits_query_with_order(self):
+    query = query_pb2.Query()
+    query.kind.add()
+    query.order.add()
+
+    self.assertRaises(ValueError, query_splitter.get_splits, None, query, 3)
+
+  def test_get_splits_query_with_unsupported_filter(self):
+    query = query_pb2.Query()
+    query.kind.add()
+    test_filter = query.filter.composite_filter.filters.add()
+    test_filter.property_filter.op = PropertyFilter.GREATER_THAN
+    self.assertRaises(ValueError, query_splitter.get_splits, None, query, 2)
+
+  def test_get_splits_query_with_limit(self):
+    query = query_pb2.Query()
+    query.kind.add()
+    query.limit.value = 10
+    self.assertRaises(ValueError, query_splitter.get_splits, None, query, 2)
+
+  def test_get_splits_query_with_offset(self):
+    query = query_pb2.Query()
+    query.kind.add()
+    query.offset = 10
+    self.assertRaises(ValueError, query_splitter.get_splits, None, query, 2)
+
+  def test_create_scatter_query(self):
+    query = query_pb2.Query()
+    kind = query.kind.add()
+    kind.name = 'shakespeare-demo'
+    num_splits = 10
+    scatter_query = query_splitter._create_scatter_query(query, num_splits)
+    self.assertEqual(scatter_query.kind[0], kind)
+    self.assertEqual(scatter_query.limit.value,
+                     (num_splits -1) * query_splitter.KEYS_PER_SPLIT)
+    self.assertEqual(scatter_query.order[0].direction,
+                     query_pb2.PropertyOrder.ASCENDING)
+    self.assertEqual(scatter_query.projection[0].property.name,
+                     query_splitter.KEY_PROPERTY_NAME)
+
+  def test_get_splits_with_two_splits(self):
+    query = query_pb2.Query()
+    kind = query.kind.add()
+    kind.name = 'shakespeare-demo'
+    num_splits = 2
+    num_entities = 97
+    batch_size = 9
+
+    self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+  def test_get_splits_with_multiple_splits(self):
+    query = query_pb2.Query()
+    kind = query.kind.add()
+    kind.name = 'shakespeare-demo'
+    num_splits = 4
+    num_entities = 369
+    batch_size = 12
+
+    self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+  def test_get_splits_with_large_num_splits(self):
+    query = query_pb2.Query()
+    kind = query.kind.add()
+    kind.name = 'shakespeare-demo'
+    num_splits = 10
+    num_entities = 4
+    batch_size = 10
+
+    self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+  def test_get_splits_with_small_num_entities(self):
+    query = query_pb2.Query()
+    kind = query.kind.add()
+    kind.name = 'shakespeare-demo'
+    num_splits = 4
+    num_entities = 50
+    batch_size = 10
+
+    self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+  def test_get_splits_with_batch_size_exact_multiple(self):
+    """Test get_splits when num scatter keys is a multiple of batch size."""
+    query = query_pb2.Query()
+    kind = query.kind.add()
+    kind.name = 'shakespeare-demo'
+    num_splits = 4
+    num_entities = 400
+    batch_size = 32
+
+    self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+  def test_get_splits_with_large_batch_size(self):
+    """Test get_splits when all scatter keys are retured in a single req."""
+    query = query_pb2.Query()
+    kind = query.kind.add()
+    kind.name = 'shakespeare-demo'
+    num_splits = 4
+    num_entities = 400
+    batch_size = 500
+
+    self.check_get_splits(query, num_splits, num_entities, batch_size)
+
+  def check_get_splits(self, query, num_splits, num_entities, batch_size):
+    """A helper method to test the query_splitter get_splits method.
+
+    Args:
+      query: the query to be split
+      num_splits: number of splits
+      num_entities: number of scatter entities contained in the fake datastore.
+      batch_size: the number of entities returned by fake datastore in one req.
+    """
+
+    entities = QuerySplitterTest.create_entities(num_entities)
+    mock_datastore = MagicMock()
+    # Assign a fake run_query method as a side_effect to the mock.
+    mock_datastore.run_query.side_effect = \
+      QuerySplitterTest.create_run_query(entities, batch_size)
+
+    split_queries = query_splitter.get_splits(mock_datastore, query, num_splits)
+
+    # if request num_splits is greater than num_entities, the best it can
+    # do is one entity per split.
+    expected_num_splits = min(num_splits, num_entities + 1)
+    self.assertEqual(len(split_queries), expected_num_splits)
+
+    expected_requests = QuerySplitterTest.create_scatter_requests(
+        query, num_splits, batch_size, num_entities)
+
+    expected_calls = []
+    for req in expected_requests:
+      expected_calls.append(call(req))
+
+    self.assertEqual(expected_calls, mock_datastore.run_query.call_args_list)
+
+  @staticmethod
+  def create_run_query(entities, batch_size):
+    """A fake datastore run_query method that returns entities in batches.
+
+    Note: the outer method is needed to make the `entities` and `batch_size`
+    available in the scope of fake_run_query method.
+
+    Args:
+      entities: list of entities supposed to be contained in the datastore.
+      batch_size: the number of entities that run_query method returns in one
+                  request.
+    """
+    def fake_run_query(req):
+      start = int(req.query.start_cursor) if req.query.start_cursor else 0
+      # if query limit is less than batch_size, then only return that much.
+      count = min(batch_size, req.query.limit.value)
+      # cannot go more than the number of entities contained in datastore.
+      end = min(len(entities), start + count)
+      finish = False
+      # Finish reading when there are no more entities to return,
+      # or request query limit has been satisfied.
+      if end == len(entities) or count == req.query.limit.value:
+        finish = True
+      return QuerySplitterTest.create_scatter_response(entities[start:end],
+                                                       str(end), finish)
+    return fake_run_query
+
+  @staticmethod
+  def create_scatter_requests(query, num_splits, batch_size, num_entities):
+    """Creates a list of expected scatter requests from the query splitter.
+
+    This list of requests returned is used to verify that the query splitter
+    made the same number of requests in the same order to datastore.
+    """
+
+    requests = []
+    count = (num_splits - 1) * query_splitter.KEYS_PER_SPLIT
+    start_cursor = ''
+    i = 0
+    scatter_query = query_splitter._create_scatter_query(query, count)
+    while i < count and i < num_entities:
+      request = datastore_pb2.RunQueryRequest()
+      request.query.CopyFrom(scatter_query)
+      request.query.start_cursor = start_cursor
+      request.query.limit.value = count - i
+      requests.append(request)
+      i += batch_size
+      start_cursor = str(i)
+
+    return requests
+
+  @staticmethod
+  def create_scatter_response(entities, end_cursor, finish):
+    """Creates a query response for a given batch of scatter entities."""
+
+    resp = datastore_pb2.RunQueryResponse()
+    if finish:
+      resp.batch.more_results = query_pb2.QueryResultBatch.NO_MORE_RESULTS
+    else:
+      resp.batch.more_results = query_pb2.QueryResultBatch.NOT_FINISHED
+
+    resp.batch.end_cursor = end_cursor
+    for entity_result in entities:
+      resp.batch.entity_results.add().CopyFrom(entity_result)
+
+    return resp
+
+  @staticmethod
+  def create_entities(count):
+    """Creates a list of entities with random keys."""
+
+    entities = []
+
+    for _ in range(0, count):
+      entity_result = query_pb2.EntityResult()
+      entity_result.entity.key.path.add().name = str(uuid.uuid4())
+      entities.append(entity_result)
+
+    return entities
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index c7b940d..af59069 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -87,6 +87,7 @@ REQUIRED_PACKAGES = [
     'avro>=1.7.7,<2.0.0',
     'dill>=0.2.5,<0.3',
     'google-apitools>=0.5.2,<1.0.0',
+    'googledatastore==6.3.0',
     'httplib2>=0.8,<0.10',
     'mock>=1.0.1,<3.0.0',
     'oauth2client>=2.0.1,<4.0.0',


[2/2] incubator-beam git commit: Closes #1310

Posted by ro...@apache.org.
Closes #1310


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/21b7844b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/21b7844b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/21b7844b

Branch: refs/heads/python-sdk
Commit: 21b7844bb05e9a86531876cffe8ee776bfaaa1cc
Parents: d1fccbf c1126b7
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 15 14:24:03 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 15 14:24:03 2016 -0800

----------------------------------------------------------------------
 .../python/apache_beam/io/datastore/__init__.py |  16 ++
 .../apache_beam/io/datastore/v1/__init__.py     |  16 ++
 .../apache_beam/io/datastore/v1/helper.py       |  84 ++++++
 .../apache_beam/io/datastore/v1/helper_test.py  | 124 +++++++++
 .../io/datastore/v1/query_splitter.py           | 270 +++++++++++++++++++
 .../io/datastore/v1/query_splitter_test.py      | 257 ++++++++++++++++++
 sdks/python/setup.py                            |   1 +
 7 files changed, 768 insertions(+)
----------------------------------------------------------------------