You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/05/10 17:19:48 UTC
[beam] branch master updated: [BEAM-7260] UTF8 coder is breaking
dataflow tests
This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c4dbe12 [BEAM-7260] UTF8 coder is breaking dataflow tests
new da4bb49 Merge pull request #8545 from ihji/BEAM-7260
c4dbe12 is described below
commit c4dbe12a09e93d0e91d2c7c3bcd3e9bb9117310c
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Thu May 9 17:22:38 2019 -0700
[BEAM-7260] UTF8 coder is breaking dataflow tests
StringUtf8Coder is now ModelCoder in JavaSDK. We need to add it to
WELL_KNOWN_CODER_TYPES for dataflow worker harness as well.
---
.../dataflow/worker/graph/LengthPrefixUnknownCoders.java | 3 ++-
.../worker/graph/LengthPrefixUnknownCodersTest.java | 13 +++++--------
2 files changed, 7 insertions(+), 9 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index d4b9e6c..1537288 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -72,7 +72,8 @@ public class LengthPrefixUnknownCoders {
"kind:fixed_big_endian_int64",
"kind:var_int32",
"kind:void",
- "org.apache.beam.sdk.coders.DoubleCoder");
+ "org.apache.beam.sdk.coders.DoubleCoder",
+ "org.apache.beam.sdk.coders.StringUtf8Coder");
private static final String LENGTH_PREFIX_CODER_TYPE = "kind:length_prefix";
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
index f73538a..679a292 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
@@ -77,14 +77,13 @@ public class LengthPrefixUnknownCodersTest {
private static final Coder<WindowedValue<KV<String, Integer>>> prefixedWindowedValueCoder =
WindowedValue.getFullCoder(
- KvCoder.of(
- LengthPrefixCoder.of(StringUtf8Coder.of()), LengthPrefixCoder.of(VarIntCoder.of())),
+ KvCoder.of(StringUtf8Coder.of(), LengthPrefixCoder.of(VarIntCoder.of())),
GlobalWindow.Coder.INSTANCE);
- private static final Coder<WindowedValue<KV<byte[], byte[]>>>
+ private static final Coder<WindowedValue<KV<String, byte[]>>>
prefixedAndReplacedWindowedValueCoder =
WindowedValue.getFullCoder(
- KvCoder.of(LENGTH_PREFIXED_BYTE_ARRAY_CODER, LENGTH_PREFIXED_BYTE_ARRAY_CODER),
+ KvCoder.of(StringUtf8Coder.of(), LENGTH_PREFIXED_BYTE_ARRAY_CODER),
GlobalWindow.Coder.INSTANCE);
private static final String MERGE_BUCKETS_DO_FN = "MergeBucketsDoFn";
@@ -124,8 +123,7 @@ public class LengthPrefixUnknownCodersTest {
Coder<WindowedValue<KV<String, Integer>>> expectedCoder =
WindowedValue.getFullCoder(
- KvCoder.of(
- LengthPrefixCoder.of(StringUtf8Coder.of()), LengthPrefixCoder.of(VarIntCoder.of())),
+ KvCoder.of(StringUtf8Coder.of(), LengthPrefixCoder.of(VarIntCoder.of())),
GlobalWindow.Coder.INSTANCE);
assertEquals(
@@ -138,8 +136,7 @@ public class LengthPrefixUnknownCodersTest {
public void testLengthPrefixAndReplaceUnknownCoder() throws Exception {
Coder<WindowedValue<KV<String, Integer>>> windowedValueCoder =
WindowedValue.getFullCoder(
- KvCoder.of(LengthPrefixCoder.of(StringUtf8Coder.of()), VarIntCoder.of()),
- GlobalWindow.Coder.INSTANCE);
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), GlobalWindow.Coder.INSTANCE);
Map<String, Object> lengthPrefixedCoderCloudObject =
forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), true);