You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/18 01:35:13 UTC
[beam] branch release-2.40.0 updated: Merge pull request #21928 from [Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch release-2.40.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.40.0 by this push:
new 1cb7eb682cc Merge pull request #21928 from [Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders
new 12d0fe45d95 Merge pull request #21936 from [cherry-pick][release-2.40.0][Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders
1cb7eb682cc is described below
commit 1cb7eb682cc3ba9dace1685ba46be4b40c45d185
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Jun 17 16:15:27 2022 -0700
Merge pull request #21928 from [Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders
* [Fixes #21927] Compress BoundedSourceAsSdfWrapper element and restriction coders
A typical BoundedSource may be split into many BoundedSource instances during initial splitting. Doing a simple test of the BigtableSource shows that encoding 10 instances after splitting took on average 102660 bytes while compressing each instance separately after encoding took 1639 bytes for a >60x improvement.
* Also handle unbounded sources.
---
CHANGES.md | 1 +
.../core/src/main/java/org/apache/beam/sdk/io/Read.java | 16 ++++++++++------
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 9cb2830aa6a..97fbd3f5874 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -75,6 +75,7 @@
* The Go Sdk now requires a minimum version of 1.18 in order to support generics ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)).
* synthetic.SourceConfig field types have changed to int64 from int for better compatibility with Flink's use of Logical types in Schemas (Go) ([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173))
+* Default coder updated to compress sources used with `BoundedSourceAsSDFWrapperFn` and `UnboundedSourceAsSDFWrapper`.
## Deprecations
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index b300f31f5d7..4b818064da4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
@@ -148,7 +149,7 @@ public class Read {
.getPipeline()
.apply(Impulse.create())
.apply(ParDo.of(new OutputSingleSource<>(source)))
- .setCoder(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {}))
+ .setCoder(SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {})))
.apply(ParDo.of(new BoundedSourceAsSDFWrapperFn<>()))
.setCoder(source.getOutputCoder())
.setTypeDescriptor(source.getOutputCoder().getEncodedTypeDescriptor());
@@ -216,7 +217,9 @@ public class Read {
.apply(Impulse.create())
.apply(ParDo.of(new OutputSingleSource<>(source)))
.setCoder(
- SerializableCoder.of(new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {}))
+ SnappyCoder.of(
+ SerializableCoder.of(
+ new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {})))
.apply(ParDo.of(createUnboundedSdfWrapper()))
.setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder()));
@@ -314,7 +317,7 @@ public class Read {
@GetRestrictionCoder
public Coder<BoundedSourceT> restrictionCoder() {
- return SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {});
+ return SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {}));
}
/**
@@ -600,9 +603,10 @@ public class Read {
@GetRestrictionCoder
public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder() {
- return new UnboundedSourceRestrictionCoder<>(
- SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
- NullableCoder.of(checkpointCoder));
+ return SnappyCoder.of(
+ new UnboundedSourceRestrictionCoder<>(
+ SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
+ NullableCoder.of(checkpointCoder)));
}
/**