You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/18 23:33:33 UTC

[GitHub] [beam] nielm commented on a change in pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'

nielm commented on a change in pull request #11570:
URL: https://github.com/apache/beam/pull/11570#discussion_r426947630



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##########
@@ -1171,67 +1145,127 @@ public void processElement(ProcessContext c) {
    * occur, Therefore this DoFn has to be tested in isolation.
    */
   @VisibleForTesting
-  static class GatherBundleAndSortFn extends DoFn<MutationGroup, Iterable<KV<byte[], byte[]>>> {
-    private final long maxBatchSizeBytes;
-    private final long maxNumMutations;
-    private final long maxNumRows;
-
-    // total size of the current batch.
-    private long batchSizeBytes;
-    // total number of mutated cells.
-    private long batchCells;
-    // total number of rows mutated.
-    private long batchRows;
+  static class GatherSortCreateBatchesFn extends DoFn<MutationGroup, Iterable<MutationGroup>> {
 
+    private final long maxBatchSizeBytes;
+    private final long maxBatchNumMutations;
+    private final long maxBatchNumRows;
+    private final long maxSortableSizeBytes;
+    private final long maxSortableNumMutations;
+    private final long maxSortableNumRows;
     private final PCollectionView<SpannerSchema> schemaView;
+    private final ArrayList<MutationGroupContainer> mutationsToSort = new ArrayList<>();
 
-    private transient ArrayList<KV<byte[], byte[]>> mutationsToSort = null;
+    // total size of MutationGroups in mutationsToSort.
+    private long sortableSizeBytes;
+    // total number of mutated cells in mutationsToSort
+    private long sortableNumCells;
+    // total number of rows mutated in mutationsToSort
+    private long sortableNumRows;
 
-    GatherBundleAndSortFn(
+    GatherSortCreateBatchesFn(
         long maxBatchSizeBytes,
         long maxNumMutations,
         long maxNumRows,
         long groupingFactor,
         PCollectionView<SpannerSchema> schemaView) {
-      this.maxBatchSizeBytes = maxBatchSizeBytes * groupingFactor;
-      this.maxNumMutations = maxNumMutations * groupingFactor;
-      this.maxNumRows = maxNumRows * groupingFactor;
+      this.maxBatchSizeBytes = maxBatchSizeBytes;
+      this.maxBatchNumMutations = maxNumMutations;
+      this.maxBatchNumRows = maxNumRows;
+
+      if (groupingFactor <= 0) {
+        groupingFactor = 1;
+      }
+
+      this.maxSortableSizeBytes = maxBatchSizeBytes * groupingFactor;
+      this.maxSortableNumMutations = maxNumMutations * groupingFactor;
+      this.maxSortableNumRows = maxNumRows * groupingFactor;
       this.schemaView = schemaView;
     }
 
     @StartBundle
     public synchronized void startBundle() throws Exception {
-      if (mutationsToSort == null) {
-        initSorter();
-      } else {
-        throw new IllegalStateException("Sorter should be null here");
-      }
+      initSorter();
     }
 
-    private void initSorter() {
-      mutationsToSort = new ArrayList<KV<byte[], byte[]>>((int) maxNumMutations);
-      batchSizeBytes = 0;
-      batchCells = 0;
-      batchRows = 0;
+    private synchronized void initSorter() {

Review comment:
       > Do we need to mark this as synchronized. Looks like all the callers are synchronized themselves.
   
   Probably not, but it does not harm.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org