You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/18 01:35:13 UTC

[beam] branch release-2.40.0 updated: Merge pull request #21928 from [Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch release-2.40.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.40.0 by this push:
     new 1cb7eb682cc Merge pull request #21928 from [Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders
     new 12d0fe45d95 Merge pull request #21936 from [cherry-pick][release-2.40.0][Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders
1cb7eb682cc is described below

commit 1cb7eb682cc3ba9dace1685ba46be4b40c45d185
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Jun 17 16:15:27 2022 -0700

    Merge pull request #21928 from [Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders
    
    * [Fixes #21927] Compress BoundedSourceAsSdfWrapper element and restriction coders
    
    A typical BoundedSource may be split into many BoundedSource instances during initial splitting. Doing a simple test of the BigtableSource shows that encoding 10 instances after splitting took on average 102660 bytes while compressing each instance separately after encoding took 1639 bytes for a >60x improvement.
    
    * Also handle unbounded sources.
---
 CHANGES.md                                               |  1 +
 .../core/src/main/java/org/apache/beam/sdk/io/Read.java  | 16 ++++++++++------
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 9cb2830aa6a..97fbd3f5874 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -75,6 +75,7 @@
 
 * The Go Sdk now requires a minimum version of 1.18 in order to support generics ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)).
 * synthetic.SourceConfig field types have changed to int64 from int for better compatibility with Flink's use of Logical types in Schemas (Go) ([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173))
+* Default coder updated to compress sources used with `BoundedSourceAsSDFWrapperFn` and `UnboundedSourceAsSDFWrapper`.
 
 ## Deprecations
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index b300f31f5d7..4b818064da4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
@@ -148,7 +149,7 @@ public class Read {
           .getPipeline()
           .apply(Impulse.create())
           .apply(ParDo.of(new OutputSingleSource<>(source)))
-          .setCoder(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {}))
+          .setCoder(SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {})))
           .apply(ParDo.of(new BoundedSourceAsSDFWrapperFn<>()))
           .setCoder(source.getOutputCoder())
           .setTypeDescriptor(source.getOutputCoder().getEncodedTypeDescriptor());
@@ -216,7 +217,9 @@ public class Read {
               .apply(Impulse.create())
               .apply(ParDo.of(new OutputSingleSource<>(source)))
               .setCoder(
-                  SerializableCoder.of(new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {}))
+                  SnappyCoder.of(
+                      SerializableCoder.of(
+                          new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {})))
               .apply(ParDo.of(createUnboundedSdfWrapper()))
               .setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder()));
 
@@ -314,7 +317,7 @@ public class Read {
 
     @GetRestrictionCoder
     public Coder<BoundedSourceT> restrictionCoder() {
-      return SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {});
+      return SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {}));
     }
 
     /**
@@ -600,9 +603,10 @@ public class Read {
 
     @GetRestrictionCoder
     public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder() {
-      return new UnboundedSourceRestrictionCoder<>(
-          SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
-          NullableCoder.of(checkpointCoder));
+      return SnappyCoder.of(
+          new UnboundedSourceRestrictionCoder<>(
+              SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
+              NullableCoder.of(checkpointCoder)));
     }
 
     /**