You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/03/16 09:04:40 UTC
[beam] branch master updated: [BEAM-9346] Improve the efficiency of
TFRecordIO (#11122)
This is an automated email from the ASF dual-hosted git repository.
mwalenia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f731211 [BEAM-9346] Improve the efficiency of TFRecordIO (#11122)
f731211 is described below
commit f731211b80429a7b1674fd14ce9b3f3796c40803
Author: Piotr Szuberski <pi...@polidea.com>
AuthorDate: Mon Mar 16 10:04:18 2020 +0100
[BEAM-9346] Improve the efficiency of TFRecordIO (#11122)
---
.../java/org/apache/beam/sdk/io/WriteFiles.java | 43 +++++++++++++++++++---
1 file changed, 38 insertions(+), 5 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 3f71bc2..5888484 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -22,7 +22,9 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.google.auto.value.AutoValue;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -48,15 +50,15 @@ import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -410,13 +412,44 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
} else {
// Pass results via a side input rather than reshuffle, because we need to get an empty
// iterable to finalize if there are no results.
- return input
- .getPipeline()
- .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+ return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
}
}
}
+ public static class ToListCombineFn<ResultT>
+ extends CombineFn<ResultT, List<ResultT>, List<ResultT>> {
+
+ @Override
+ public List<ResultT> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<ResultT> addInput(List<ResultT> mutableAccumulator, ResultT input) {
+ mutableAccumulator.add(input);
+ return mutableAccumulator;
+ }
+
+ @Override
+ public List<ResultT> mergeAccumulators(Iterable<List<ResultT>> accumulators) {
+ Iterator<List<ResultT>> iter = accumulators.iterator();
+ if (!iter.hasNext()) {
+ return new ArrayList<>();
+ }
+ List<ResultT> merged = iter.next();
+ while (iter.hasNext()) {
+ merged.addAll(iter.next());
+ }
+ return merged;
+ }
+
+ @Override
+ public List<ResultT> extractOutput(List<ResultT> accumulator) {
+ return accumulator;
+ }
+ }
+
private class WriteUnshardedBundlesToTempFiles
extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
private final Coder<DestinationT> destinationCoder;