You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/29 15:20:03 UTC

[GitHub] [beam] nielm opened a new pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

nielm opened a new pull request #11570:
URL: https://github.com/apache/beam/pull/11570


   
   There is minimal benefit in separating these 2 stages, and significant
   benefity in merging them: Gather and Sort encodes incoming
   MutationGroups into a List<byte[]> which would contain up to 1GB.
   This is then output (copied) to the CreateBatches where it is decoded
   back into MutationGroups.
   
   Removing this encode/decode should save up to 2GB of RAM.
   
   Note, this PR is dependent on PR #11528,  PR #11532 and PR #11529
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-630495763


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] allenpradeep commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
allenpradeep commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-625003968


   Hi Niel,
   I see a bunch of unit tests failing on this commit. 
   I am working on a patch on top of this and i noticed this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nielm commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
nielm commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-630491802


   Retest this please
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nielm commented on a change in pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
nielm commented on a change in pull request #11570:
URL: https://github.com/apache/beam/pull/11570#discussion_r426947630



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##########
@@ -1171,67 +1145,127 @@ public void processElement(ProcessContext c) {
    * occur, Therefore this DoFn has to be tested in isolation.
    */
   @VisibleForTesting
-  static class GatherBundleAndSortFn extends DoFn<MutationGroup, Iterable<KV<byte[], byte[]>>> {
-    private final long maxBatchSizeBytes;
-    private final long maxNumMutations;
-    private final long maxNumRows;
-
-    // total size of the current batch.
-    private long batchSizeBytes;
-    // total number of mutated cells.
-    private long batchCells;
-    // total number of rows mutated.
-    private long batchRows;
+  static class GatherSortCreateBatchesFn extends DoFn<MutationGroup, Iterable<MutationGroup>> {
 
+    private final long maxBatchSizeBytes;
+    private final long maxBatchNumMutations;
+    private final long maxBatchNumRows;
+    private final long maxSortableSizeBytes;
+    private final long maxSortableNumMutations;
+    private final long maxSortableNumRows;
     private final PCollectionView<SpannerSchema> schemaView;
+    private final ArrayList<MutationGroupContainer> mutationsToSort = new ArrayList<>();
 
-    private transient ArrayList<KV<byte[], byte[]>> mutationsToSort = null;
+    // total size of MutationGroups in mutationsToSort.
+    private long sortableSizeBytes;
+    // total number of mutated cells in mutationsToSort
+    private long sortableNumCells;
+    // total number of rows mutated in mutationsToSort
+    private long sortableNumRows;
 
-    GatherBundleAndSortFn(
+    GatherSortCreateBatchesFn(
         long maxBatchSizeBytes,
         long maxNumMutations,
         long maxNumRows,
         long groupingFactor,
         PCollectionView<SpannerSchema> schemaView) {
-      this.maxBatchSizeBytes = maxBatchSizeBytes * groupingFactor;
-      this.maxNumMutations = maxNumMutations * groupingFactor;
-      this.maxNumRows = maxNumRows * groupingFactor;
+      this.maxBatchSizeBytes = maxBatchSizeBytes;
+      this.maxBatchNumMutations = maxNumMutations;
+      this.maxBatchNumRows = maxNumRows;
+
+      if (groupingFactor <= 0) {
+        groupingFactor = 1;
+      }
+
+      this.maxSortableSizeBytes = maxBatchSizeBytes * groupingFactor;
+      this.maxSortableNumMutations = maxNumMutations * groupingFactor;
+      this.maxSortableNumRows = maxNumRows * groupingFactor;
       this.schemaView = schemaView;
     }
 
     @StartBundle
     public synchronized void startBundle() throws Exception {
-      if (mutationsToSort == null) {
-        initSorter();
-      } else {
-        throw new IllegalStateException("Sorter should be null here");
-      }
+      initSorter();
     }
 
-    private void initSorter() {
-      mutationsToSort = new ArrayList<KV<byte[], byte[]>>((int) maxNumMutations);
-      batchSizeBytes = 0;
-      batchCells = 0;
-      batchRows = 0;
+    private synchronized void initSorter() {

Review comment:
       > Do we need to mark this as synchronized. Looks like all the callers are synchronized themselves.
   
   Probably not, but it does not harm.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nielm commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
nielm commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-630488670


   @allenpradeep 
   > 1. What mode should our import pipeline use? Should it use option b as data in AVRO seems already sorted?
   
   We can discuss this outside the scope of this PR. 
   
   > 2. Where should we document these modes of operation so that some customer can use these?
   
   I have added a section to the javadoc explaining these 3 modes of operation, and their pros and cons.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-650362590


   Thanks. We can merge if post-commit tests pass.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-648539787


   
   ```
   [CheckStyle] Attaching ResultAction with ID 'checkstyle' to run 'beam_PreCommit_Java_Commit #11858'.
   Setting status of a74866ba56d92d9476006b7e40a0e0ff916748ca to FAILURE with url https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/11858/ and message: 'Build finished. '
   Using context: Java ("Run Java PreCommit")
   Finished: ABORTED
   ```
   Can't tell if tests passed or not, rerunning.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] allenpradeep commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
allenpradeep commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-649119157


   Can we merge this patch? I have a patch to count bytes written to spanner which would be dependent on this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] allenpradeep edited a comment on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
allenpradeep edited a comment on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-649119157


   Can we merge this PR? I would want to send out a PR to count bytes written to spanner and that would be dependent on this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nielm commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
nielm commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-630775006


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-648539478


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nielm commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
nielm commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-649431492


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nielm removed a comment on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
nielm removed a comment on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-630491802


   Retest this please
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] udim commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
udim commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-648324954


   Is this ready to merge?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nielm commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
nielm commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-641849548


   Retest this please
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-630900072


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-630903815


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] allenpradeep commented on a change in pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
allenpradeep commented on a change in pull request #11570:
URL: https://github.com/apache/beam/pull/11570#discussion_r420982372



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##########
@@ -1171,67 +1145,127 @@ public void processElement(ProcessContext c) {
    * occur, Therefore this DoFn has to be tested in isolation.
    */
   @VisibleForTesting
-  static class GatherBundleAndSortFn extends DoFn<MutationGroup, Iterable<KV<byte[], byte[]>>> {
-    private final long maxBatchSizeBytes;
-    private final long maxNumMutations;
-    private final long maxNumRows;
-
-    // total size of the current batch.
-    private long batchSizeBytes;
-    // total number of mutated cells.
-    private long batchCells;
-    // total number of rows mutated.
-    private long batchRows;
+  static class GatherSortCreateBatchesFn extends DoFn<MutationGroup, Iterable<MutationGroup>> {
 
+    private final long maxBatchSizeBytes;
+    private final long maxBatchNumMutations;
+    private final long maxBatchNumRows;
+    private final long maxSortableSizeBytes;
+    private final long maxSortableNumMutations;
+    private final long maxSortableNumRows;
     private final PCollectionView<SpannerSchema> schemaView;
+    private final ArrayList<MutationGroupContainer> mutationsToSort = new ArrayList<>();
 
-    private transient ArrayList<KV<byte[], byte[]>> mutationsToSort = null;
+    // total size of MutationGroups in mutationsToSort.
+    private long sortableSizeBytes;
+    // total number of mutated cells in mutationsToSort
+    private long sortableNumCells;
+    // total number of rows mutated in mutationsToSort
+    private long sortableNumRows;
 
-    GatherBundleAndSortFn(
+    GatherSortCreateBatchesFn(
         long maxBatchSizeBytes,
         long maxNumMutations,
         long maxNumRows,
         long groupingFactor,
         PCollectionView<SpannerSchema> schemaView) {
-      this.maxBatchSizeBytes = maxBatchSizeBytes * groupingFactor;
-      this.maxNumMutations = maxNumMutations * groupingFactor;
-      this.maxNumRows = maxNumRows * groupingFactor;
+      this.maxBatchSizeBytes = maxBatchSizeBytes;
+      this.maxBatchNumMutations = maxNumMutations;
+      this.maxBatchNumRows = maxNumRows;
+
+      if (groupingFactor <= 0) {
+        groupingFactor = 1;
+      }
+
+      this.maxSortableSizeBytes = maxBatchSizeBytes * groupingFactor;
+      this.maxSortableNumMutations = maxNumMutations * groupingFactor;
+      this.maxSortableNumRows = maxNumRows * groupingFactor;
       this.schemaView = schemaView;
     }
 
     @StartBundle
     public synchronized void startBundle() throws Exception {
-      if (mutationsToSort == null) {
-        initSorter();
-      } else {
-        throw new IllegalStateException("Sorter should be null here");
-      }
+      initSorter();
     }
 
-    private void initSorter() {
-      mutationsToSort = new ArrayList<KV<byte[], byte[]>>((int) maxNumMutations);
-      batchSizeBytes = 0;
-      batchCells = 0;
-      batchRows = 0;
+    private synchronized void initSorter() {

Review comment:
       Do we need to mark this as synchronized. Looks like all the callers are synchronized themselves.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] udim merged pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
udim merged pull request #11570:
URL: https://github.com/apache/beam/pull/11570


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] allenpradeep commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
allenpradeep commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-624793783


   I'm good with these changes except the questions I had regarding the usages.
   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-643442999


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-650362116


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] allenpradeep commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

Posted by GitBox <gi...@apache.org>.
allenpradeep commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-622580239


   This is great niel.  With these changes, there are 3 modes of using SpannerIO write.
   a) Use the conventional way(as it was till now) with a grouping factor where data is grouped, sorted, batched and written as per parameters
   b) Batching without grouping - Set grouping factor as 1 with a larger batched bytes or cells. This will just ensure data is just batched without sort.
   c) No Batching - Set any of the max rows or max mutations or batch bytes to 0 or 1.
   
   Questions:
   1) What mode should our import pipeline use? Should it use option b as data in AVRO seems already sorted?
   2) Where should we document these modes of operation so that some customer can use these?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org