You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/11/28 23:54:43 UTC
[2/2] incubator-beam git commit: datastoreio write/delete ptransform
datastoreio write/delete ptransform
update datastore_wordcount example to include writes
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d46203b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d46203b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d46203b7
Branch: refs/heads/python-sdk
Commit: d46203b7fcdc9895c9cee1d82710f48aba31a748
Parents: 3dbeb8e
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed Nov 23 14:09:09 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Nov 28 15:54:27 2016 -0800
----------------------------------------------------------------------
.../apache_beam/examples/datastore_wordcount.py | 137 +++++++++++++++----
.../apache_beam/io/datastore/v1/datastoreio.py | 104 +++++++++++++-
.../io/datastore/v1/datastoreio_test.py | 46 +++++++
.../io/datastore/v1/fake_datastore.py | 17 +++
.../apache_beam/io/datastore/v1/helper.py | 35 ++++-
.../apache_beam/io/datastore/v1/helper_test.py | 36 +++++
.../io/datastore/v1/query_splitter.py | 7 +-
7 files changed, 349 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/examples/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/datastore_wordcount.py b/sdks/python/apache_beam/examples/datastore_wordcount.py
index af75b1c..6b9779b 100644
--- a/sdks/python/apache_beam/examples/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/datastore_wordcount.py
@@ -22,14 +22,18 @@ from __future__ import absolute_import
import argparse
import logging
import re
+import uuid
+
+from google.datastore.v1 import entity_pb2
+from google.datastore.v1 import query_pb2
+from googledatastore import helper as datastore_helper, PropertyFilter
import apache_beam as beam
from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
from apache_beam.utils.options import GoogleCloudOptions
from apache_beam.utils.options import PipelineOptions
from apache_beam.utils.options import SetupOptions
-from google.datastore.v1 import query_pb2
-
empty_line_aggregator = beam.Aggregator('emptyLines')
average_word_size_aggregator = beam.Aggregator('averageWordLength',
@@ -41,7 +45,7 @@ class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def process(self, context):
- """Returns an iterator over the words of this element.
+ """Returns an iterator over words in contents of Cloud Datastore entity.
The element is a line of text. If the line is blank, note that, too.
Args:
context: the call-specific context: data and aggregator.
@@ -61,10 +65,100 @@ class WordExtractingDoFn(beam.DoFn):
return words
+class EntityWrapper(object):
+ """Create a Cloud Datastore entity from the given string."""
+ def __init__(self, namespace, kind, ancestor):
+ self._namespace = namespace
+ self._kind = kind
+ self._ancestor = ancestor
+
+ def make_entity(self, content):
+ entity = entity_pb2.Entity()
+ if self._namespace is not None:
+ entity.key.partition_id.namespace_id = self._namespace
+
+ # All entities created will have the same ancestor
+ datastore_helper.add_key_path(entity.key, self._kind, self._ancestor,
+ self._kind, str(uuid.uuid4()))
+
+ datastore_helper.add_properties(entity, {"content": unicode(content)})
+ return entity
+
+
+def write_to_datastore(project, user_options, pipeline_options):
+ """Creates a pipeline that writes entities to Cloud Datastore."""
+ p = beam.Pipeline(options=pipeline_options)
+
+ # pylint: disable=expression-not-assigned
+ (p
+ | 'read' >> beam.io.Read(beam.io.TextFileSource(user_options.input))
+ | 'create entity' >> beam.Map(
+ EntityWrapper(user_options.namespace, user_options.kind,
+ user_options.ancestor).make_entity)
+ | 'write to datastore' >> WriteToDatastore(project))
+
+ # Actually run the pipeline (all operations above are deferred).
+ p.run()
+
+
+def make_ancestor_query(kind, namespace, ancestor):
+ """Creates a Cloud Datastore ancestor query.
+
+ The returned query will fetch all the entities that have the parent key name
+ set to the given `ancestor`.
+ """
+ ancestor_key = entity_pb2.Key()
+ datastore_helper.add_key_path(ancestor_key, kind, ancestor)
+ if namespace is not None:
+ ancestor_key.partition_id.namespace_id = namespace
+
+ query = query_pb2.Query()
+ query.kind.add().name = kind
+
+ datastore_helper.set_property_filter(
+ query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor_key)
+
+ return query
+
+
+def read_from_datastore(project, user_options, pipeline_options):
+ """Creates a pipeline that reads entities from Cloud Datastore."""
+ p = beam.Pipeline(options=pipeline_options)
+ # Create a query to read entities from datastore.
+ query = make_ancestor_query(user_options.kind, user_options.namespace,
+ user_options.ancestor)
+
+ # Read entities from Cloud Datastore into a PCollection.
+ lines = p | 'read from datastore' >> ReadFromDatastore(
+ project, query, user_options.namespace)
+
+ # Count the occurrences of each word.
+ counts = (lines
+ | 'split' >> (beam.ParDo(WordExtractingDoFn())
+ .with_output_types(unicode))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+
+ # Format the counts into a PCollection of strings.
+ output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+ output | 'write' >> beam.io.Write(beam.io.TextFileSink(user_options.output))
+
+ # Actually run the pipeline (all operations above are deferred).
+ return p.run()
+
+
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
+ parser.add_argument('--input',
+ dest='input',
+ default='gs://dataflow-samples/shakespeare/kinglear.txt',
+ help='Input file to process.')
parser.add_argument('--kind',
dest='kind',
required=True,
@@ -72,42 +166,33 @@ def run(argv=None):
parser.add_argument('--namespace',
dest='namespace',
help='Datastore Namespace')
+ parser.add_argument('--ancestor',
+ dest='ancestor',
+ default='root',
+ help='The ancestor key name for all entities.')
parser.add_argument('--output',
dest='output',
required=True,
help='Output file to write results to.')
+ parser.add_argument('--read_only',
+ action='store_true',
+ help='Read an existing dataset, do not write first')
+
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
gcloud_options = pipeline_options.view_as(GoogleCloudOptions)
- p = beam.Pipeline(options=pipeline_options)
-
- query = query_pb2.Query()
- query.kind.add().name = known_args.kind
- # Read entities from Cloud Datastore into a PCollection.
- lines = p | 'read from datastore' >> ReadFromDatastore(
- gcloud_options.project, query, known_args.namespace)
-
- # Count the occurrences of each word.
- counts = (lines
- | 'split' >> (beam.ParDo(WordExtractingDoFn())
- .with_output_types(unicode))
- | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
- | 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
-
- # Format the counts into a PCollection of strings.
- output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ # Write to Datastore if `read_only` options is not specified.
+ if not known_args.read_only:
+ write_to_datastore(gcloud_options.project, known_args, pipeline_options)
- # Write the output using a "Write" transform that has side effects.
- # pylint: disable=expression-not-assigned
- output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+ # Read from Datastore.
+ result = read_from_datastore(gcloud_options.project, known_args,
+ pipeline_options)
- # Actually run the pipeline (all operations above are deferred).
- result = p.run()
empty_line_values = result.aggregated_values(empty_line_aggregator)
logging.info('number of empty lines: %d', sum(empty_line_values.values()))
word_length_values = result.aggregated_values(average_word_size_aggregator)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
index d542439..20466b9 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
@@ -19,6 +19,7 @@
import logging
+from google.datastore.v1 import datastore_pb2
from googledatastore import helper as datastore_helper
from apache_beam.io.datastore.v1 import helper
@@ -27,11 +28,12 @@ from apache_beam.transforms import Create
from apache_beam.transforms import DoFn
from apache_beam.transforms import FlatMap
from apache_beam.transforms import GroupByKey
+from apache_beam.transforms import Map
from apache_beam.transforms import PTransform
from apache_beam.transforms import ParDo
from apache_beam.transforms.util import Values
-__all__ = ['ReadFromDatastore']
+__all__ = ['ReadFromDatastore', 'WriteToDatastore', 'DeleteFromDatastore']
class ReadFromDatastore(PTransform):
@@ -285,3 +287,103 @@ class ReadFromDatastore(PTransform):
num_splits = ReadFromDatastore._NUM_QUERY_SPLITS_MIN
return max(num_splits, ReadFromDatastore._NUM_QUERY_SPLITS_MIN)
+
+
+class _Mutate(PTransform):
+ """A ``PTransform`` that writes mutations to Cloud Datastore.
+
+ Only idempotent Datastore mutation operations (upsert and delete) are
+ supported, as the commits are retried when failures occur.
+ """
+
+ # Max allowed Datastore write batch size.
+ _WRITE_BATCH_SIZE = 500
+
+ def __init__(self, project, mutation_fn):
+ """Initializes a Mutate transform.
+
+ Args:
+ project: The Project ID
+ mutation_fn: A function that converts `entities` or `keys` to
+ `mutations`.
+ """
+ self._project = project
+ self._mutation_fn = mutation_fn
+
+ def apply(self, pcoll):
+ return (pcoll
+ | 'Convert to Mutation' >> Map(self._mutation_fn)
+ | 'Write Mutation to Datastore' >> ParDo(_Mutate.DatastoreWriteFn(
+ self._project)))
+
+ def display_data(self):
+ return {'project': self._project,
+ 'mutation_fn': self._mutation_fn.__class__.__name__}
+
+ class DatastoreWriteFn(DoFn):
+ """A ``DoFn`` that write mutations to Datastore.
+
+ Mutations are written in batches, where the maximum batch size is
+ `Mutate._WRITE_BATCH_SIZE`.
+
+ Commits are non-transactional. If a commit fails because of a conflict over
+ an entity group, the commit will be retried. This means that the mutation
+ should be idempotent (`upsert` and `delete` mutations) to prevent duplicate
+ data or errors.
+ """
+ def __init__(self, project):
+ self._project = project
+ self._datastore = None
+ self._mutations = []
+
+ def start_bundle(self, context):
+ self._mutations = []
+ self._datastore = helper.get_datastore(self._project)
+
+ def process(self, context):
+ self._mutations.append(context.element)
+ if len(self._mutations) >= _Mutate._WRITE_BATCH_SIZE:
+ self._flush_batch()
+
+ def finish_bundle(self, context):
+ if self._mutations:
+ self._flush_batch()
+ self._mutations = []
+
+ def _flush_batch(self):
+ # Flush the current batch of mutations to Cloud Datastore.
+ helper.write_mutations(self._datastore, self._project, self._mutations)
+ logging.debug("Successfully wrote %d mutations.", len(self._mutations))
+ self._mutations = []
+
+
+class WriteToDatastore(_Mutate):
+ """A ``PTransform`` to write a ``PCollection[Entity]`` to Cloud Datastore."""
+ def __init__(self, project):
+ super(WriteToDatastore, self).__init__(
+ project, WriteToDatastore.to_upsert_mutation)
+
+ @staticmethod
+ def to_upsert_mutation(entity):
+ if not helper.is_key_valid(entity.key):
+ raise ValueError('Entities to be written to the Cloud Datastore must '
+ 'have complete keys:\n%s' % entity)
+ mutation = datastore_pb2.Mutation()
+ mutation.upsert.CopyFrom(entity)
+ return mutation
+
+
+class DeleteFromDatastore(_Mutate):
+ """A ``PTransform`` to delete a ``PCollection[Key]`` from Cloud Datastore."""
+ def __init__(self, project):
+ super(DeleteFromDatastore, self).__init__(
+ project, DeleteFromDatastore.to_delete_mutation)
+
+ @staticmethod
+ def to_delete_mutation(key):
+ if not helper.is_key_valid(key):
+ raise ValueError('Keys to be deleted from the Cloud Datastore must be '
+ 'complete:\n%s", key')
+ mutation = datastore_pb2.Mutation()
+ mutation.delete.CopyFrom(key)
+ return mutation
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
index 2bf01f4..2ac7ffb 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
@@ -22,9 +22,13 @@ from google.datastore.v1 import query_pb2
from google.protobuf import timestamp_pb2
from googledatastore import helper as datastore_helper
from mock import MagicMock, call, patch
+
+from apache_beam.io.datastore.v1 import fake_datastore
from apache_beam.io.datastore.v1 import helper
from apache_beam.io.datastore.v1 import query_splitter
+from apache_beam.io.datastore.v1.datastoreio import _Mutate
from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
class DatastoreioTest(unittest.TestCase):
@@ -118,6 +122,48 @@ class DatastoreioTest(unittest.TestCase):
self.assertEqual(1, len(returned_split_queries))
self.assertEqual(0, len(self._mock_datastore.method_calls))
+ def test_DatastoreWriteFn_with_emtpy_batch(self):
+ self.check_DatastoreWriteFn(0)
+
+ def test_DatastoreWriteFn_with_one_batch(self):
+ num_entities_to_write = _Mutate._WRITE_BATCH_SIZE * 1 - 50
+ self.check_DatastoreWriteFn(num_entities_to_write)
+
+ def test_DatastoreWriteFn_with_multiple_batches(self):
+ num_entities_to_write = _Mutate._WRITE_BATCH_SIZE * 3 + 50
+ self.check_DatastoreWriteFn(num_entities_to_write)
+
+ def test_DatastoreWriteFn_with_batch_size_exact_multiple(self):
+ num_entities_to_write = _Mutate._WRITE_BATCH_SIZE * 2
+ self.check_DatastoreWriteFn(num_entities_to_write)
+
+ def check_DatastoreWriteFn(self, num_entities):
+ """A helper function to test DatastoreWriteFn."""
+
+ with patch.object(helper, 'get_datastore',
+ return_value=self._mock_datastore):
+ entities = [e.entity for e in
+ fake_datastore.create_entities(num_entities)]
+
+ expected_mutations = map(WriteToDatastore.to_upsert_mutation, entities)
+ actual_mutations = []
+
+ self._mock_datastore.commit.side_effect = (
+ fake_datastore.create_commit(actual_mutations))
+
+ datastore_write_fn = _Mutate.DatastoreWriteFn(self._PROJECT)
+
+ mock_context = MagicMock()
+ datastore_write_fn.start_bundle(mock_context)
+ for mutation in expected_mutations:
+ mock_context.element = mutation
+ datastore_write_fn.process(mock_context)
+ datastore_write_fn.finish_bundle(mock_context)
+
+ self.assertEqual(actual_mutations, expected_mutations)
+ self.assertEqual((num_entities - 1) / _Mutate._WRITE_BATCH_SIZE + 1,
+ self._mock_datastore.commit.call_count)
+
def verify_unique_keys(self, queries):
"""A helper function that verifies if all the queries have unique keys."""
keys, _ = zip(*queries)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py b/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py
index 631908e..af2c4c1 100644
--- a/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py
+++ b/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py
@@ -48,6 +48,23 @@ def create_run_query(entities, batch_size):
return run_query
+def create_commit(mutations):
+ """A fake Datastore commit method that writes the mutations to a list.
+
+ Args:
+ mutations: A list to write mutations to.
+
+ Returns:
+ A fake Datastore commit method
+ """
+
+ def commit(req):
+ for mutation in req.mutations:
+ mutations.append(mutation)
+
+ return commit
+
+
def create_response(entities, end_cursor, finish):
"""Creates a query response for a given batch of scatter entities."""
resp = datastore_pb2.RunQueryResponse()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper.py b/sdks/python/apache_beam/io/datastore/v1/helper.py
index 39ca40c..28cb123 100644
--- a/sdks/python/apache_beam/io/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/datastore/v1/helper.py
@@ -25,7 +25,6 @@ from googledatastore import PropertyFilter, CompositeFilter
from googledatastore import helper as datastore_helper
from googledatastore.connection import Datastore
from googledatastore.connection import RPCError
-import googledatastore
from apache_beam.utils import retry
@@ -98,7 +97,7 @@ def str_compare(s1, s2):
def get_datastore(project):
"""Returns a Cloud Datastore client."""
- credentials = googledatastore.helper.get_credentials_from_env()
+ credentials = datastore_helper.get_credentials_from_env()
datastore = Datastore(project, credentials)
return datastore
@@ -149,6 +148,38 @@ def fetch_entities(project, namespace, query, datastore):
return QueryIterator(project, namespace, query, datastore)
+def is_key_valid(key):
+ """Returns True if a Cloud Datastore key is complete.
+
+ A key is complete if its last element has either an id or a name.
+ """
+ if not key.path:
+ return False
+ return key.path[-1].HasField('id') or key.path[-1].HasField('name')
+
+
+def write_mutations(datastore, project, mutations):
+ """A helper function to write a batch of mutations to Cloud Datastore.
+
+ If a commit fails, it will be retried upto 5 times. All mutations in the
+ batch will be committed again, even if the commit was partially successful.
+ If the retry limit is exceeded, the last exception from Cloud Datastore will
+ be raised.
+ """
+ commit_request = datastore_pb2.CommitRequest()
+ commit_request.mode = datastore_pb2.CommitRequest.NON_TRANSACTIONAL
+ commit_request.project_id = project
+ for mutation in mutations:
+ commit_request.mutations.add().CopyFrom(mutation)
+
+ @retry.with_exponential_backoff(num_retries=5,
+ retry_filter=retry_on_rpc_error)
+ def commit(req):
+ datastore.commit(req)
+
+ commit(commit_request)
+
+
def make_latest_timestamp_query(namespace):
"""Make a Query to fetch the latest timestamp statistics."""
query = query_pb2.Query()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
index 69741d2..6f45993 100644
--- a/sdks/python/apache_beam/io/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
@@ -21,9 +21,11 @@ import sys
import unittest
from google.datastore.v1 import datastore_pb2
+from google.datastore.v1 import entity_pb2
from google.datastore.v1 import query_pb2
from google.datastore.v1.entity_pb2 import Key
from googledatastore.connection import RPCError
+from googledatastore import helper as datastore_helper
from mock import MagicMock, Mock, patch
from apache_beam.io.datastore.v1 import fake_datastore
@@ -144,6 +146,40 @@ class HelperTest(unittest.TestCase):
limit = query.limit.value if query.HasField('limit') else sys.maxint
self.assertEqual(i, min(num_entities, limit))
+ def test_is_key_valid(self):
+ key = entity_pb2.Key()
+ # Complete with name, no ancestor
+ datastore_helper.add_key_path(key, 'kind', 'name')
+ self.assertTrue(helper.is_key_valid(key))
+
+ key = entity_pb2.Key()
+ # Complete with id, no ancestor
+ datastore_helper.add_key_path(key, 'kind', 12)
+ self.assertTrue(helper.is_key_valid(key))
+
+ key = entity_pb2.Key()
+ # Incomplete, no ancestor
+ datastore_helper.add_key_path(key, 'kind')
+ self.assertFalse(helper.is_key_valid(key))
+
+ key = entity_pb2.Key()
+ # Complete with name and ancestor
+ datastore_helper.add_key_path(key, 'kind', 'name', 'kind2', 'name2')
+ self.assertTrue(helper.is_key_valid(key))
+
+ key = entity_pb2.Key()
+ # Complete with id and ancestor
+ datastore_helper.add_key_path(key, 'kind', 'name', 'kind2', 123)
+ self.assertTrue(helper.is_key_valid(key))
+
+ key = entity_pb2.Key()
+ # Incomplete with ancestor
+ datastore_helper.add_key_path(key, 'kind', 'name', 'kind2')
+ self.assertFalse(helper.is_key_valid(key))
+
+ key = entity_pb2.Key()
+ self.assertFalse(helper.is_key_valid(key))
+
def test_compare_path_with_different_kind(self):
p1 = Key.PathElement()
p1.kind = 'dummy1'
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
index 82aa972..98a8786 100644
--- a/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
+++ b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
@@ -22,7 +22,7 @@ from google.datastore.v1 import datastore_pb2
from google.datastore.v1 import query_pb2
from google.datastore.v1.query_pb2 import PropertyFilter
from google.datastore.v1.query_pb2 import CompositeFilter
-import googledatastore
+from googledatastore import helper as datastore_helper
__all__ = [
@@ -129,8 +129,7 @@ def _create_scatter_query(query, num_splits):
scatter_kind.CopyFrom(kind)
# ascending order
- googledatastore.helper.add_property_orders(scatter_query,
- SCATTER_PROPERTY_NAME)
+ datastore_helper.add_property_orders(scatter_query, SCATTER_PROPERTY_NAME)
# There is a split containing entities before and after each scatter entity:
# ||---*------*------*------*------*------*------*---|| * = scatter entity
@@ -138,7 +137,7 @@ def _create_scatter_query(query, num_splits):
# extra region following the last scatter point. Thus, we do not need the
# scatter entity for the last region.
scatter_query.limit.value = (num_splits - 1) * KEYS_PER_SPLIT
- googledatastore.helper.add_projection(scatter_query, KEY_PROPERTY_NAME)
+ datastore_helper.add_projection(scatter_query, KEY_PROPERTY_NAME)
return scatter_query