You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/03/16 09:04:40 UTC

[beam] branch master updated: [BEAM-9346] Improve the efficiency of TFRecordIO (#11122)

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f731211  [BEAM-9346] Improve the efficiency of TFRecordIO (#11122)
f731211 is described below

commit f731211b80429a7b1674fd14ce9b3f3796c40803
Author: Piotr Szuberski <pi...@polidea.com>
AuthorDate: Mon Mar 16 10:04:18 2020 +0100

    [BEAM-9346] Improve the efficiency of TFRecordIO (#11122)
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 43 +++++++++++++++++++---
 1 file changed, 38 insertions(+), 5 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 3f71bc2..5888484 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -22,7 +22,9 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -48,15 +50,15 @@ import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reify;
 import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -410,13 +412,44 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
       }
     }
   }
 
+  public static class ToListCombineFn<ResultT>
+      extends CombineFn<ResultT, List<ResultT>, List<ResultT>> {
+
+    @Override
+    public List<ResultT> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<ResultT> addInput(List<ResultT> mutableAccumulator, ResultT input) {
+      mutableAccumulator.add(input);
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<ResultT> mergeAccumulators(Iterable<List<ResultT>> accumulators) {
+      Iterator<List<ResultT>> iter = accumulators.iterator();
+      if (!iter.hasNext()) {
+        return new ArrayList<>();
+      }
+      List<ResultT> merged = iter.next();
+      while (iter.hasNext()) {
+        merged.addAll(iter.next());
+      }
+      return merged;
+    }
+
+    @Override
+    public List<ResultT> extractOutput(List<ResultT> accumulator) {
+      return accumulator;
+    }
+  }
+
   private class WriteUnshardedBundlesToTempFiles
       extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
     private final Coder<DestinationT> destinationCoder;