You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/05/10 17:19:48 UTC

[beam] branch master updated: [BEAM-7260] UTF8 coder is breaking dataflow tests

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c4dbe12  [BEAM-7260] UTF8 coder is breaking dataflow tests
     new da4bb49  Merge pull request #8545 from ihji/BEAM-7260
c4dbe12 is described below

commit c4dbe12a09e93d0e91d2c7c3bcd3e9bb9117310c
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Thu May 9 17:22:38 2019 -0700

    [BEAM-7260] UTF8 coder is breaking dataflow tests
    
    StringUtf8Coder is now ModelCoder in JavaSDK. We need to add it to
    WELL_KNOWN_CODER_TYPES for dataflow worker harness as well.
---
 .../dataflow/worker/graph/LengthPrefixUnknownCoders.java    |  3 ++-
 .../worker/graph/LengthPrefixUnknownCodersTest.java         | 13 +++++--------
 2 files changed, 7 insertions(+), 9 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index d4b9e6c..1537288 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -72,7 +72,8 @@ public class LengthPrefixUnknownCoders {
           "kind:fixed_big_endian_int64",
           "kind:var_int32",
           "kind:void",
-          "org.apache.beam.sdk.coders.DoubleCoder");
+          "org.apache.beam.sdk.coders.DoubleCoder",
+          "org.apache.beam.sdk.coders.StringUtf8Coder");
 
   private static final String LENGTH_PREFIX_CODER_TYPE = "kind:length_prefix";
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
index f73538a..679a292 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
@@ -77,14 +77,13 @@ public class LengthPrefixUnknownCodersTest {
 
   private static final Coder<WindowedValue<KV<String, Integer>>> prefixedWindowedValueCoder =
       WindowedValue.getFullCoder(
-          KvCoder.of(
-              LengthPrefixCoder.of(StringUtf8Coder.of()), LengthPrefixCoder.of(VarIntCoder.of())),
+          KvCoder.of(StringUtf8Coder.of(), LengthPrefixCoder.of(VarIntCoder.of())),
           GlobalWindow.Coder.INSTANCE);
 
-  private static final Coder<WindowedValue<KV<byte[], byte[]>>>
+  private static final Coder<WindowedValue<KV<String, byte[]>>>
       prefixedAndReplacedWindowedValueCoder =
           WindowedValue.getFullCoder(
-              KvCoder.of(LENGTH_PREFIXED_BYTE_ARRAY_CODER, LENGTH_PREFIXED_BYTE_ARRAY_CODER),
+              KvCoder.of(StringUtf8Coder.of(), LENGTH_PREFIXED_BYTE_ARRAY_CODER),
               GlobalWindow.Coder.INSTANCE);
 
   private static final String MERGE_BUCKETS_DO_FN = "MergeBucketsDoFn";
@@ -124,8 +123,7 @@ public class LengthPrefixUnknownCodersTest {
 
     Coder<WindowedValue<KV<String, Integer>>> expectedCoder =
         WindowedValue.getFullCoder(
-            KvCoder.of(
-                LengthPrefixCoder.of(StringUtf8Coder.of()), LengthPrefixCoder.of(VarIntCoder.of())),
+            KvCoder.of(StringUtf8Coder.of(), LengthPrefixCoder.of(VarIntCoder.of())),
             GlobalWindow.Coder.INSTANCE);
 
     assertEquals(
@@ -138,8 +136,7 @@ public class LengthPrefixUnknownCodersTest {
   public void testLengthPrefixAndReplaceUnknownCoder() throws Exception {
     Coder<WindowedValue<KV<String, Integer>>> windowedValueCoder =
         WindowedValue.getFullCoder(
-            KvCoder.of(LengthPrefixCoder.of(StringUtf8Coder.of()), VarIntCoder.of()),
-            GlobalWindow.Coder.INSTANCE);
+            KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), GlobalWindow.Coder.INSTANCE);
 
     Map<String, Object> lengthPrefixedCoderCloudObject =
         forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), true);