You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/11/28 23:54:43 UTC

[2/2] incubator-beam git commit: datastoreio write/delete ptransform

datastoreio write/delete ptransform

update datastore_wordcount example to include writes


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d46203b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d46203b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d46203b7

Branch: refs/heads/python-sdk
Commit: d46203b7fcdc9895c9cee1d82710f48aba31a748
Parents: 3dbeb8e
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed Nov 23 14:09:09 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Nov 28 15:54:27 2016 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/datastore_wordcount.py | 137 +++++++++++++++----
 .../apache_beam/io/datastore/v1/datastoreio.py  | 104 +++++++++++++-
 .../io/datastore/v1/datastoreio_test.py         |  46 +++++++
 .../io/datastore/v1/fake_datastore.py           |  17 +++
 .../apache_beam/io/datastore/v1/helper.py       |  35 ++++-
 .../apache_beam/io/datastore/v1/helper_test.py  |  36 +++++
 .../io/datastore/v1/query_splitter.py           |   7 +-
 7 files changed, 349 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/examples/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/datastore_wordcount.py b/sdks/python/apache_beam/examples/datastore_wordcount.py
index af75b1c..6b9779b 100644
--- a/sdks/python/apache_beam/examples/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/datastore_wordcount.py
@@ -22,14 +22,18 @@ from __future__ import absolute_import
 import argparse
 import logging
 import re
+import uuid
+
+from google.datastore.v1 import entity_pb2
+from google.datastore.v1 import query_pb2
+from googledatastore import helper as datastore_helper, PropertyFilter
 
 import apache_beam as beam
 from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
 from apache_beam.utils.options import GoogleCloudOptions
 from apache_beam.utils.options import PipelineOptions
 from apache_beam.utils.options import SetupOptions
-from google.datastore.v1 import query_pb2
-
 
 empty_line_aggregator = beam.Aggregator('emptyLines')
 average_word_size_aggregator = beam.Aggregator('averageWordLength',
@@ -41,7 +45,7 @@ class WordExtractingDoFn(beam.DoFn):
   """Parse each line of input text into words."""
 
   def process(self, context):
-    """Returns an iterator over the words of this element.
+    """Returns an iterator over words in contents of Cloud Datastore entity.
     The element is a line of text.  If the line is blank, note that, too.
     Args:
       context: the call-specific context: data and aggregator.
@@ -61,10 +65,100 @@ class WordExtractingDoFn(beam.DoFn):
     return words
 
 
+class EntityWrapper(object):
+  """Create a Cloud Datastore entity from the given string."""
+  def __init__(self, namespace, kind, ancestor):
+    self._namespace = namespace
+    self._kind = kind
+    self._ancestor = ancestor
+
+  def make_entity(self, content):
+    entity = entity_pb2.Entity()
+    if self._namespace is not None:
+      entity.key.partition_id.namespace_id = self._namespace
+
+    # All entities created will have the same ancestor
+    datastore_helper.add_key_path(entity.key, self._kind, self._ancestor,
+                                  self._kind, str(uuid.uuid4()))
+
+    datastore_helper.add_properties(entity, {"content": unicode(content)})
+    return entity
+
+
+def write_to_datastore(project, user_options, pipeline_options):
+  """Creates a pipeline that writes entities to Cloud Datastore."""
+  p = beam.Pipeline(options=pipeline_options)
+
+  # pylint: disable=expression-not-assigned
+  (p
+   | 'read' >> beam.io.Read(beam.io.TextFileSource(user_options.input))
+   | 'create entity' >> beam.Map(
+       EntityWrapper(user_options.namespace, user_options.kind,
+                     user_options.ancestor).make_entity)
+   | 'write to datastore' >> WriteToDatastore(project))
+
+  # Actually run the pipeline (all operations above are deferred).
+  p.run()
+
+
+def make_ancestor_query(kind, namespace, ancestor):
+  """Creates a Cloud Datastore ancestor query.
+
+  The returned query will fetch all the entities that have the parent key name
+  set to the given `ancestor`.
+  """
+  ancestor_key = entity_pb2.Key()
+  datastore_helper.add_key_path(ancestor_key, kind, ancestor)
+  if namespace is not None:
+    ancestor_key.partition_id.namespace_id = namespace
+
+  query = query_pb2.Query()
+  query.kind.add().name = kind
+
+  datastore_helper.set_property_filter(
+      query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor_key)
+
+  return query
+
+
+def read_from_datastore(project, user_options, pipeline_options):
+  """Creates a pipeline that reads entities from Cloud Datastore."""
+  p = beam.Pipeline(options=pipeline_options)
+  # Create a query to read entities from datastore.
+  query = make_ancestor_query(user_options.kind, user_options.namespace,
+                              user_options.ancestor)
+
+  # Read entities from Cloud Datastore into a PCollection.
+  lines = p | 'read from datastore' >> ReadFromDatastore(
+      project, query, user_options.namespace)
+
+  # Count the occurrences of each word.
+  counts = (lines
+            | 'split' >> (beam.ParDo(WordExtractingDoFn())
+                          .with_output_types(unicode))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+
+  # Format the counts into a PCollection of strings.
+  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+
+  # Write the output using a "Write" transform that has side effects.
+  # pylint: disable=expression-not-assigned
+  output | 'write' >> beam.io.Write(beam.io.TextFileSink(user_options.output))
+
+  # Actually run the pipeline (all operations above are deferred).
+  return p.run()
+
+
 def run(argv=None):
   """Main entry point; defines and runs the wordcount pipeline."""
 
   parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      dest='input',
+                      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+                      help='Input file to process.')
   parser.add_argument('--kind',
                       dest='kind',
                       required=True,
@@ -72,42 +166,33 @@ def run(argv=None):
   parser.add_argument('--namespace',
                       dest='namespace',
                       help='Datastore Namespace')
+  parser.add_argument('--ancestor',
+                      dest='ancestor',
+                      default='root',
+                      help='The ancestor key name for all entities.')
   parser.add_argument('--output',
                       dest='output',
                       required=True,
                       help='Output file to write results to.')
+  parser.add_argument('--read_only',
+                      action='store_true',
+                      help='Read an existing dataset, do not write first')
+
   known_args, pipeline_args = parser.parse_known_args(argv)
   # We use the save_main_session option because one or more DoFn's in this
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
   gcloud_options = pipeline_options.view_as(GoogleCloudOptions)
-  p = beam.Pipeline(options=pipeline_options)
-
-  query = query_pb2.Query()
-  query.kind.add().name = known_args.kind
 
-  # Read entities from Cloud Datastore into a PCollection.
-  lines = p | 'read from datastore' >> ReadFromDatastore(
-      gcloud_options.project, query, known_args.namespace)
-
-  # Count the occurrences of each word.
-  counts = (lines
-            | 'split' >> (beam.ParDo(WordExtractingDoFn())
-                          .with_output_types(unicode))
-            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
-            | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
-
-  # Format the counts into a PCollection of strings.
-  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+  # Write to Datastore if `read_only` options is not specified.
+  if not known_args.read_only:
+    write_to_datastore(gcloud_options.project, known_args, pipeline_options)
 
-  # Write the output using a "Write" transform that has side effects.
-  # pylint: disable=expression-not-assigned
-  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+  # Read from Datastore.
+  result = read_from_datastore(gcloud_options.project, known_args,
+                               pipeline_options)
 
-  # Actually run the pipeline (all operations above are deferred).
-  result = p.run()
   empty_line_values = result.aggregated_values(empty_line_aggregator)
   logging.info('number of empty lines: %d', sum(empty_line_values.values()))
   word_length_values = result.aggregated_values(average_word_size_aggregator)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
index d542439..20466b9 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
@@ -19,6 +19,7 @@
 
 import logging
 
+from google.datastore.v1 import datastore_pb2
 from googledatastore import helper as datastore_helper
 
 from apache_beam.io.datastore.v1 import helper
@@ -27,11 +28,12 @@ from apache_beam.transforms import Create
 from apache_beam.transforms import DoFn
 from apache_beam.transforms import FlatMap
 from apache_beam.transforms import GroupByKey
+from apache_beam.transforms import Map
 from apache_beam.transforms import PTransform
 from apache_beam.transforms import ParDo
 from apache_beam.transforms.util import Values
 
-__all__ = ['ReadFromDatastore']
+__all__ = ['ReadFromDatastore', 'WriteToDatastore', 'DeleteFromDatastore']
 
 
 class ReadFromDatastore(PTransform):
@@ -285,3 +287,103 @@ class ReadFromDatastore(PTransform):
       num_splits = ReadFromDatastore._NUM_QUERY_SPLITS_MIN
 
     return max(num_splits, ReadFromDatastore._NUM_QUERY_SPLITS_MIN)
+
+
+class _Mutate(PTransform):
+  """A ``PTransform`` that writes mutations to Cloud Datastore.
+
+  Only idempotent Datastore mutation operations (upsert and delete) are
+  supported, as the commits are retried when failures occur.
+  """
+
+  # Max allowed Datastore write batch size.
+  _WRITE_BATCH_SIZE = 500
+
+  def __init__(self, project, mutation_fn):
+    """Initializes a Mutate transform.
+
+     Args:
+       project: The Project ID
+       mutation_fn: A function that converts `entities` or `keys` to
+         `mutations`.
+     """
+    self._project = project
+    self._mutation_fn = mutation_fn
+
+  def apply(self, pcoll):
+    return (pcoll
+            | 'Convert to Mutation' >> Map(self._mutation_fn)
+            | 'Write Mutation to Datastore' >> ParDo(_Mutate.DatastoreWriteFn(
+                self._project)))
+
+  def display_data(self):
+    return {'project': self._project,
+            'mutation_fn': self._mutation_fn.__class__.__name__}
+
+  class DatastoreWriteFn(DoFn):
+    """A ``DoFn`` that write mutations to Datastore.
+
+    Mutations are written in batches, where the maximum batch size is
+    `Mutate._WRITE_BATCH_SIZE`.
+
+    Commits are non-transactional. If a commit fails because of a conflict over
+    an entity group, the commit will be retried. This means that the mutation
+    should be idempotent (`upsert` and `delete` mutations) to prevent duplicate
+    data or errors.
+    """
+    def __init__(self, project):
+      self._project = project
+      self._datastore = None
+      self._mutations = []
+
+    def start_bundle(self, context):
+      self._mutations = []
+      self._datastore = helper.get_datastore(self._project)
+
+    def process(self, context):
+      self._mutations.append(context.element)
+      if len(self._mutations) >= _Mutate._WRITE_BATCH_SIZE:
+        self._flush_batch()
+
+    def finish_bundle(self, context):
+      if self._mutations:
+        self._flush_batch()
+      self._mutations = []
+
+    def _flush_batch(self):
+      # Flush the current batch of mutations to Cloud Datastore.
+      helper.write_mutations(self._datastore, self._project, self._mutations)
+      logging.debug("Successfully wrote %d mutations.", len(self._mutations))
+      self._mutations = []
+
+
+class WriteToDatastore(_Mutate):
+  """A ``PTransform`` to write a ``PCollection[Entity]`` to Cloud Datastore."""
+  def __init__(self, project):
+    super(WriteToDatastore, self).__init__(
+        project, WriteToDatastore.to_upsert_mutation)
+
+  @staticmethod
+  def to_upsert_mutation(entity):
+    if not helper.is_key_valid(entity.key):
+      raise ValueError('Entities to be written to the Cloud Datastore must '
+                       'have complete keys:\n%s' % entity)
+    mutation = datastore_pb2.Mutation()
+    mutation.upsert.CopyFrom(entity)
+    return mutation
+
+
+class DeleteFromDatastore(_Mutate):
+  """A ``PTransform`` to delete a ``PCollection[Key]`` from Cloud Datastore."""
+  def __init__(self, project):
+    super(DeleteFromDatastore, self).__init__(
+        project, DeleteFromDatastore.to_delete_mutation)
+
+  @staticmethod
+  def to_delete_mutation(key):
+    if not helper.is_key_valid(key):
+      raise ValueError('Keys to be deleted from the Cloud Datastore must be '
+                       'complete:\n%s", key')
+    mutation = datastore_pb2.Mutation()
+    mutation.delete.CopyFrom(key)
+    return mutation

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
index 2bf01f4..2ac7ffb 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
@@ -22,9 +22,13 @@ from google.datastore.v1 import query_pb2
 from google.protobuf import timestamp_pb2
 from googledatastore import helper as datastore_helper
 from mock import MagicMock, call, patch
+
+from apache_beam.io.datastore.v1 import fake_datastore
 from apache_beam.io.datastore.v1 import helper
 from apache_beam.io.datastore.v1 import query_splitter
+from apache_beam.io.datastore.v1.datastoreio import _Mutate
 from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
 
 
 class DatastoreioTest(unittest.TestCase):
@@ -118,6 +122,48 @@ class DatastoreioTest(unittest.TestCase):
       self.assertEqual(1, len(returned_split_queries))
       self.assertEqual(0, len(self._mock_datastore.method_calls))
 
+  def test_DatastoreWriteFn_with_emtpy_batch(self):
+    self.check_DatastoreWriteFn(0)
+
+  def test_DatastoreWriteFn_with_one_batch(self):
+    num_entities_to_write = _Mutate._WRITE_BATCH_SIZE * 1 - 50
+    self.check_DatastoreWriteFn(num_entities_to_write)
+
+  def test_DatastoreWriteFn_with_multiple_batches(self):
+    num_entities_to_write = _Mutate._WRITE_BATCH_SIZE * 3 + 50
+    self.check_DatastoreWriteFn(num_entities_to_write)
+
+  def test_DatastoreWriteFn_with_batch_size_exact_multiple(self):
+    num_entities_to_write = _Mutate._WRITE_BATCH_SIZE * 2
+    self.check_DatastoreWriteFn(num_entities_to_write)
+
+  def check_DatastoreWriteFn(self, num_entities):
+    """A helper function to test DatastoreWriteFn."""
+
+    with patch.object(helper, 'get_datastore',
+                      return_value=self._mock_datastore):
+      entities = [e.entity for e in
+                  fake_datastore.create_entities(num_entities)]
+
+      expected_mutations = map(WriteToDatastore.to_upsert_mutation, entities)
+      actual_mutations = []
+
+      self._mock_datastore.commit.side_effect = (
+          fake_datastore.create_commit(actual_mutations))
+
+      datastore_write_fn = _Mutate.DatastoreWriteFn(self._PROJECT)
+
+      mock_context = MagicMock()
+      datastore_write_fn.start_bundle(mock_context)
+      for mutation in expected_mutations:
+        mock_context.element = mutation
+        datastore_write_fn.process(mock_context)
+      datastore_write_fn.finish_bundle(mock_context)
+
+      self.assertEqual(actual_mutations, expected_mutations)
+      self.assertEqual((num_entities - 1) / _Mutate._WRITE_BATCH_SIZE + 1,
+                       self._mock_datastore.commit.call_count)
+
   def verify_unique_keys(self, queries):
     """A helper function that verifies if all the queries have unique keys."""
     keys, _ = zip(*queries)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py b/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py
index 631908e..af2c4c1 100644
--- a/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py
+++ b/sdks/python/apache_beam/io/datastore/v1/fake_datastore.py
@@ -48,6 +48,23 @@ def create_run_query(entities, batch_size):
   return run_query
 
 
+def create_commit(mutations):
+  """A fake Datastore commit method that writes the mutations to a list.
+
+  Args:
+    mutations: A list to write mutations to.
+
+  Returns:
+    A fake Datastore commit method
+  """
+
+  def commit(req):
+    for mutation in req.mutations:
+      mutations.append(mutation)
+
+  return commit
+
+
 def create_response(entities, end_cursor, finish):
   """Creates a query response for a given batch of scatter entities."""
   resp = datastore_pb2.RunQueryResponse()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper.py b/sdks/python/apache_beam/io/datastore/v1/helper.py
index 39ca40c..28cb123 100644
--- a/sdks/python/apache_beam/io/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/datastore/v1/helper.py
@@ -25,7 +25,6 @@ from googledatastore import PropertyFilter, CompositeFilter
 from googledatastore import helper as datastore_helper
 from googledatastore.connection import Datastore
 from googledatastore.connection import RPCError
-import googledatastore
 
 from apache_beam.utils import retry
 
@@ -98,7 +97,7 @@ def str_compare(s1, s2):
 
 def get_datastore(project):
   """Returns a Cloud Datastore client."""
-  credentials = googledatastore.helper.get_credentials_from_env()
+  credentials = datastore_helper.get_credentials_from_env()
   datastore = Datastore(project, credentials)
   return datastore
 
@@ -149,6 +148,38 @@ def fetch_entities(project, namespace, query, datastore):
   return QueryIterator(project, namespace, query, datastore)
 
 
+def is_key_valid(key):
+  """Returns True if a Cloud Datastore key is complete.
+
+  A key is complete if its last element has either an id or a name.
+  """
+  if not key.path:
+    return False
+  return key.path[-1].HasField('id') or key.path[-1].HasField('name')
+
+
+def write_mutations(datastore, project, mutations):
+  """A helper function to write a batch of mutations to Cloud Datastore.
+
+  If a commit fails, it will be retried upto 5 times. All mutations in the
+  batch will be committed again, even if the commit was partially successful.
+  If the retry limit is exceeded, the last exception from Cloud Datastore will
+  be raised.
+  """
+  commit_request = datastore_pb2.CommitRequest()
+  commit_request.mode = datastore_pb2.CommitRequest.NON_TRANSACTIONAL
+  commit_request.project_id = project
+  for mutation in mutations:
+    commit_request.mutations.add().CopyFrom(mutation)
+
+  @retry.with_exponential_backoff(num_retries=5,
+                                  retry_filter=retry_on_rpc_error)
+  def commit(req):
+    datastore.commit(req)
+
+  commit(commit_request)
+
+
 def make_latest_timestamp_query(namespace):
   """Make a Query to fetch the latest timestamp statistics."""
   query = query_pb2.Query()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
index 69741d2..6f45993 100644
--- a/sdks/python/apache_beam/io/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/datastore/v1/helper_test.py
@@ -21,9 +21,11 @@ import sys
 import unittest
 
 from google.datastore.v1 import datastore_pb2
+from google.datastore.v1 import entity_pb2
 from google.datastore.v1 import query_pb2
 from google.datastore.v1.entity_pb2 import Key
 from googledatastore.connection import RPCError
+from googledatastore import helper as datastore_helper
 from mock import MagicMock, Mock, patch
 
 from apache_beam.io.datastore.v1 import fake_datastore
@@ -144,6 +146,40 @@ class HelperTest(unittest.TestCase):
     limit = query.limit.value if query.HasField('limit') else sys.maxint
     self.assertEqual(i, min(num_entities, limit))
 
+  def test_is_key_valid(self):
+    key = entity_pb2.Key()
+    # Complete with name, no ancestor
+    datastore_helper.add_key_path(key, 'kind', 'name')
+    self.assertTrue(helper.is_key_valid(key))
+
+    key = entity_pb2.Key()
+    # Complete with id, no ancestor
+    datastore_helper.add_key_path(key, 'kind', 12)
+    self.assertTrue(helper.is_key_valid(key))
+
+    key = entity_pb2.Key()
+    # Incomplete, no ancestor
+    datastore_helper.add_key_path(key, 'kind')
+    self.assertFalse(helper.is_key_valid(key))
+
+    key = entity_pb2.Key()
+    # Complete with name and ancestor
+    datastore_helper.add_key_path(key, 'kind', 'name', 'kind2', 'name2')
+    self.assertTrue(helper.is_key_valid(key))
+
+    key = entity_pb2.Key()
+    # Complete with id and ancestor
+    datastore_helper.add_key_path(key, 'kind', 'name', 'kind2', 123)
+    self.assertTrue(helper.is_key_valid(key))
+
+    key = entity_pb2.Key()
+    # Incomplete with ancestor
+    datastore_helper.add_key_path(key, 'kind', 'name', 'kind2')
+    self.assertFalse(helper.is_key_valid(key))
+
+    key = entity_pb2.Key()
+    self.assertFalse(helper.is_key_valid(key))
+
   def test_compare_path_with_different_kind(self):
     p1 = Key.PathElement()
     p1.kind = 'dummy1'

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d46203b7/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
index 82aa972..98a8786 100644
--- a/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
+++ b/sdks/python/apache_beam/io/datastore/v1/query_splitter.py
@@ -22,7 +22,7 @@ from google.datastore.v1 import datastore_pb2
 from google.datastore.v1 import query_pb2
 from google.datastore.v1.query_pb2 import PropertyFilter
 from google.datastore.v1.query_pb2 import CompositeFilter
-import googledatastore
+from googledatastore import helper as datastore_helper
 
 
 __all__ = [
@@ -129,8 +129,7 @@ def _create_scatter_query(query, num_splits):
     scatter_kind.CopyFrom(kind)
 
   # ascending order
-  googledatastore.helper.add_property_orders(scatter_query,
-                                             SCATTER_PROPERTY_NAME)
+  datastore_helper.add_property_orders(scatter_query, SCATTER_PROPERTY_NAME)
 
   # There is a split containing entities before and after each scatter entity:
   # ||---*------*------*------*------*------*------*---||  * = scatter entity
@@ -138,7 +137,7 @@ def _create_scatter_query(query, num_splits):
   # extra region following the last scatter point. Thus, we do not need the
   # scatter entity for the last region.
   scatter_query.limit.value = (num_splits - 1) * KEYS_PER_SPLIT
-  googledatastore.helper.add_projection(scatter_query, KEY_PROPERTY_NAME)
+  datastore_helper.add_projection(scatter_query, KEY_PROPERTY_NAME)
 
   return scatter_query