You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/08 12:56:01 UTC

[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r422104136



##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
##########
@@ -499,6 +499,7 @@ def test_batch_byte_size(
       # and each bach should contains 25 mutations.
       res = (
           p | beam.Create(mutation_group)
+          | 'combine to list' >> beam.combiners.ToList()

Review comment:
       Yes, the `_BatchFn` requires a single iterable of collection and loop through them to make the batches. Just replicating the same pipeline for the batching in the `_WriteGroup` transform.

##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -1008,31 +1007,30 @@ def _reset_count(self):
     self._cells = 0
 
   def process(self, element):
-    mg_info = element.info
+    for elem in element:

Review comment:
       There was no issue in processing mutation group, the issue was with the batch size. According to the Beam execution model, ‘**The division of the collection into bundles is arbitrary and selected by the runner.**’ Which causes finish_bundle to be called multiple times rather than on the complete collection unit which causes the improper number of batches in the dataflow runner. That's the reason I've added the ToList transform to make a single collection and generate the batches properly.

##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -1008,31 +1007,30 @@ def _reset_count(self):
     self._cells = 0
 
   def process(self, element):
-    mg_info = element.info
+    for elem in element:
+      mg_info = elem.info
+      if mg_info['byte_size'] + self._size_in_bytes > \

Review comment:
       Sure. Should I create a new Jira ticket and (1) add ticket number in this PR for reference OR (2) create a new PR for this change, and once it gets merge then I rebase this PR and request review? 
   
   I think the first approach required less time to close the tickets! What you suggest?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org