You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/02/18 18:35:31 UTC

[beam] branch master updated: fix: fix bug when retrieving either string or json

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f973906  fix: fix bug when retrieving either string or json
     new a354889  Merge pull request #16890 from [BEAM-12164]: fix bug when retrieving either string or json
f973906 is described below

commit f9739064deedda4fd5ff2b283404d15e87a911fd
Author: Thiago Nunes <th...@google.com>
AuthorDate: Fri Feb 18 19:03:17 2022 +1100

    fix: fix bug when retrieving either string or json
    
    struct.getValue() throws an error when getting a struct that contains
    a json inside. We circumvent this, by checking the type and calling
    either struct.getString() or struct.getJson().
---
 .../mapper/ChangeStreamRecordMapper.java            | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
index 4368ba7..e2bae67 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.Type;
-import com.google.cloud.spanner.Value;
 import java.util.HashSet;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -280,7 +279,7 @@ public class ChangeStreamRecordMapper {
 
   private ColumnType columnTypeFrom(Struct struct) {
     // TODO: Move to type struct.getJson when backend is fully migrated
-    final String type = getJsonString(struct.getValue(TYPE_COLUMN));
+    final String type = getJsonString(struct, TYPE_COLUMN);
     return new ColumnType(
         struct.getString(NAME_COLUMN),
         new TypeCode(type),
@@ -290,13 +289,13 @@ public class ChangeStreamRecordMapper {
 
   private Mod modFrom(Struct struct) {
     // TODO: Move to keys struct.getJson when backend is fully migrated
-    final String keys = getJsonString(struct.getValue(KEYS_COLUMN));
+    final String keys = getJsonString(struct, KEYS_COLUMN);
     // TODO: Move to oldValues struct.getJson when backend is fully migrated
     final String oldValues =
-        struct.isNull(OLD_VALUES_COLUMN) ? null : getJsonString(struct.getValue(OLD_VALUES_COLUMN));
+        struct.isNull(OLD_VALUES_COLUMN) ? null : getJsonString(struct, OLD_VALUES_COLUMN);
     // TODO: Move to newValues struct.getJson when backend is fully migrated
     final String newValues =
-        struct.isNull(NEW_VALUES_COLUMN) ? null : getJsonString(struct.getValue(NEW_VALUES_COLUMN));
+        struct.isNull(NEW_VALUES_COLUMN) ? null : getJsonString(struct, NEW_VALUES_COLUMN);
     return new Mod(keys, oldValues, newValues);
   }
 
@@ -331,13 +330,13 @@ public class ChangeStreamRecordMapper {
   }
 
   // TODO: Remove when backend is fully migrated to JSON
-  private String getJsonString(Value value) {
-    if (value.getType().equals(Type.json())) {
-      return value.getJson();
-    } else if (value.getType().equals(Type.string())) {
-      return value.getString();
+  private String getJsonString(Struct struct, String columnName) {
+    if (struct.getColumnType(columnName).equals(Type.json())) {
+      return struct.getJson(columnName);
+    } else if (struct.getColumnType(columnName).equals(Type.string())) {
+      return struct.getString(columnName);
     } else {
-      throw new IllegalArgumentException("Can not extract string from value " + value);
+      throw new IllegalArgumentException("Can not extract string from value " + columnName);
     }
   }
 }