You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/23 15:39:42 UTC
[1/2] beam git commit: Comply with byte limit for Datastore Commit.
Repository: beam
Updated Branches:
refs/heads/master 9da46fd05 -> 597b07e0d
Comply with byte limit for Datastore Commit.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c0f599d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c0f599d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c0f599d
Branch: refs/heads/master
Commit: 3c0f599d64a7f57608f1c18b05f2ab036a8b02fc
Parents: 9da46fd
Author: Colin Phipps <fi...@google.com>
Authored: Wed May 10 09:50:56 2017 +0000
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 23 08:39:09 2017 -0700
----------------------------------------------------------------------
.../apache_beam/io/gcp/datastore/v1/datastoreio.py | 15 ++++++++++++++-
.../io/gcp/datastore/v1/datastoreio_test.py | 17 +++++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3c0f599d/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index c606133..89c2a93 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -313,8 +313,12 @@ class _Mutate(PTransform):
supported, as the commits are retried when failures occur.
"""
- # Max allowed Datastore write batch size.
+ # Max allowed Datastore writes per batch, and max bytes per batch.
+ # Note that the max bytes per batch set here is lower than the 10MB limit
+ # actually enforced by the API, to leave space for the CommitRequest wrapper
+ # around the mutations.
_WRITE_BATCH_SIZE = 500
+ _WRITE_BATCH_BYTES_SIZE = 9000000
def __init__(self, project, mutation_fn):
"""Initializes a Mutate transform.
@@ -353,13 +357,20 @@ class _Mutate(PTransform):
self._project = project
self._datastore = None
self._mutations = []
+ self._mutations_size = 0 # Total size of entries in _mutations.
def start_bundle(self):
self._mutations = []
+ self._mutations_size = 0
self._datastore = helper.get_datastore(self._project)
def process(self, element):
+ size = element.ByteSize()
+ if (self._mutations and
+ size + self._mutations_size > _Mutate._WRITE_BATCH_BYTES_SIZE):
+ self._flush_batch()
self._mutations.append(element)
+ self._mutations_size += size
if len(self._mutations) >= _Mutate._WRITE_BATCH_SIZE:
self._flush_batch()
@@ -367,12 +378,14 @@ class _Mutate(PTransform):
if self._mutations:
self._flush_batch()
self._mutations = []
+ self._mutations_size = 0
def _flush_batch(self):
# Flush the current batch of mutations to Cloud Datastore.
helper.write_mutations(self._datastore, self._project, self._mutations)
logging.debug("Successfully wrote %d mutations.", len(self._mutations))
self._mutations = []
+ self._mutations_size = 0
class WriteToDatastore(_Mutate):
http://git-wip-us.apache.org/repos/asf/beam/blob/3c0f599d/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index 6adc08a..424e714 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import math
import unittest
from mock import MagicMock, call, patch
@@ -191,6 +192,22 @@ class DatastoreioTest(unittest.TestCase):
self.assertEqual((num_entities - 1) / _Mutate._WRITE_BATCH_SIZE + 1,
self._mock_datastore.commit.call_count)
+ def test_DatastoreWriteLargeEntities(self):
+ """100*100kB entities gets split over two Commit RPCs."""
+ with patch.object(helper, 'get_datastore',
+ return_value=self._mock_datastore):
+ entities = [e.entity for e in fake_datastore.create_entities(100)]
+
+ datastore_write_fn = _Mutate.DatastoreWriteFn(self._PROJECT)
+ datastore_write_fn.start_bundle()
+ for entity in entities:
+ datastore_helper.add_properties(
+ entity, {'large': u'A' * 100000}, exclude_from_indexes=True)
+ datastore_write_fn.process(WriteToDatastore.to_upsert_mutation(entity))
+ datastore_write_fn.finish_bundle()
+
+ self.assertEqual(2, self._mock_datastore.commit.call_count)
+
def verify_unique_keys(self, queries):
"""A helper function that verifies if all the queries have unique keys."""
keys, _ = zip(*queries)
[2/2] beam git commit: This closes #3043
Posted by al...@apache.org.
This closes #3043
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/597b07e0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/597b07e0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/597b07e0
Branch: refs/heads/master
Commit: 597b07e0d9fc3bbce68e06cd10b13ea204f2236f
Parents: 9da46fd 3c0f599
Author: Ahmet Altay <al...@google.com>
Authored: Tue May 23 08:39:26 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 23 08:39:26 2017 -0700
----------------------------------------------------------------------
.../apache_beam/io/gcp/datastore/v1/datastoreio.py | 15 ++++++++++++++-
.../io/gcp/datastore/v1/datastoreio_test.py | 17 +++++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------