You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/01 21:46:21 UTC

[beam] branch master updated: feat: allow for unknown values in change streams (#17655)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 23aeca4e373 feat: allow for unknown values in change streams (#17655)
23aeca4e373 is described below

commit 23aeca4e373ff5a4de176770d52f15a37653a32e
Author: Thiago Nunes <th...@google.com>
AuthorDate: Wed Jun 1 14:46:15 2022 -0700

    feat: allow for unknown values in change streams (#17655)
    
    Allows for unknown values to be given in the mod type and value capture
    type. This way, customers won't have to force update the connector when
    the backend adds a new value here.
    
    Co-authored-by: Pablo <pa...@users.noreply.github.com>
---
 .../mapper/ChangeStreamRecordMapper.java           | 22 +++++++-
 .../gcp/spanner/changestreams/model/ModType.java   |  3 +-
 .../changestreams/model/ValueCaptureType.java      |  1 +
 .../mapper/ChangeStreamRecordMapperTest.java       | 28 ++++++++++
 .../changestreams/util/TestStructMapper.java       | 60 ++++++++++++++++------
 5 files changed, 96 insertions(+), 18 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
index e2bae67d10f..0f6ecc0e7cd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -249,8 +249,8 @@ public class ChangeStreamRecordMapper {
             .map(this::columnTypeFrom)
             .collect(Collectors.toList()),
         row.getStructList(MODS_COLUMN).stream().map(this::modFrom).collect(Collectors.toList()),
-        ModType.valueOf(row.getString(MOD_TYPE_COLUMN)),
-        ValueCaptureType.valueOf(row.getString(VALUE_CAPTURE_TYPE_COLUMN)),
+        modTypeFrom(row.getString(MOD_TYPE_COLUMN)),
+        valueCaptureTypeFrom(row.getString(VALUE_CAPTURE_TYPE_COLUMN)),
         row.getLong(NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN),
         row.getLong(NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN),
         changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
@@ -299,6 +299,24 @@ public class ChangeStreamRecordMapper {
     return new Mod(keys, oldValues, newValues);
   }
 
+  private ModType modTypeFrom(String name) {
+    try {
+      return ModType.valueOf(name);
+    } catch (IllegalArgumentException e) {
+      // This is not logged to prevent flooding users with messages
+      return ModType.UNKNOWN;
+    }
+  }
+
+  private ValueCaptureType valueCaptureTypeFrom(String name) {
+    try {
+      return ValueCaptureType.valueOf(name);
+    } catch (IllegalArgumentException e) {
+      // This is not logged to prevent flooding users with messages
+      return ValueCaptureType.UNKNOWN;
+    }
+  }
+
   private ChildPartition childPartitionFrom(String partitionToken, Struct struct) {
     final HashSet<String> parentTokens =
         Sets.newHashSet(struct.getStringList(PARENT_PARTITION_TOKENS_COLUMN));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModType.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModType.java
index 4929831fba0..e56dd0af6d6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModType.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModType.java
@@ -28,5 +28,6 @@ import org.apache.beam.sdk.coders.DefaultCoder;
 public enum ModType {
   INSERT,
   UPDATE,
-  DELETE
+  DELETE,
+  UNKNOWN
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
index 1eeb830d3c2..80bea2aa47f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
@@ -28,4 +28,5 @@ import org.apache.beam.sdk.coders.DefaultCoder;
 @DefaultCoder(AvroCoder.class)
 public enum ValueCaptureType {
   OLD_AND_NEW_VALUES,
+  UNKNOWN
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
index 50f42927e58..5fd05e61c1b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
 
 import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson;
 import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsWithUnknownModTypeAndValueCaptureType;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -172,6 +173,33 @@ public class ChangeStreamRecordMapperTest {
         mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
   }
 
+  @Test
+  public void testMappingStructRowWithUnknownModTypeAndValueCaptureTypeToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "transactionId",
+            false,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(
+                new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+            ModType.UNKNOWN,
+            ValueCaptureType.UNKNOWN,
+            10L,
+            2L,
+            null);
+    final Struct struct = recordsWithUnknownModTypeAndValueCaptureType(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+  }
+
   @Test
   public void testMappingStructRowToHeartbeatRecord() {
     final HeartbeatRecord heartbeatRecord =
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
index a3a434c6eed..98ee02216d2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
@@ -109,15 +109,23 @@ public class TestStructMapper {
           StructField.of("child_partitions_record", Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
 
   public static Struct recordsToStructWithJson(ChangeStreamRecord... records) {
-    return recordsToStruct(true, records);
+    return recordsToStruct(false, false, true, records);
+  }
+
+  public static Struct recordsWithUnknownModTypeAndValueCaptureType(ChangeStreamRecord... records) {
+    return recordsToStruct(true, true, true, records);
   }
 
   // TODO: Remove when backend is fully migrated to JSON
   public static Struct recordsToStructWithStrings(ChangeStreamRecord... records) {
-    return recordsToStruct(false, records);
+    return recordsToStruct(false, false, false, records);
   }
 
-  private static Struct recordsToStruct(boolean useJsonFields, ChangeStreamRecord... records) {
+  private static Struct recordsToStruct(
+      boolean useUnknownModType,
+      boolean useUnknownValueCaptureType,
+      boolean useJsonFields,
+      ChangeStreamRecord... records) {
     final Type streamRecordType =
         useJsonFields ? STREAM_RECORD_JSON_TYPE : STREAM_RECORD_STRING_TYPE;
     return Struct.newBuilder()
@@ -125,14 +133,25 @@ public class TestStructMapper {
             Value.structArray(
                 streamRecordType,
                 Arrays.stream(records)
-                    .map(record -> TestStructMapper.streamRecordStructFrom(record, useJsonFields))
+                    .map(
+                        record ->
+                            TestStructMapper.streamRecordStructFrom(
+                                record,
+                                useUnknownModType,
+                                useUnknownValueCaptureType,
+                                useJsonFields))
                     .collect(Collectors.toList())))
         .build();
   }
 
-  private static Struct streamRecordStructFrom(ChangeStreamRecord record, boolean useJsonFields) {
+  private static Struct streamRecordStructFrom(
+      ChangeStreamRecord record,
+      boolean useUnknownModType,
+      boolean useUnknownValueCaptureType,
+      boolean useJsonFields) {
     if (record instanceof DataChangeRecord) {
-      return streamRecordStructFrom((DataChangeRecord) record, useJsonFields);
+      return streamRecordStructFrom(
+          (DataChangeRecord) record, useUnknownModType, useUnknownValueCaptureType, useJsonFields);
     } else if (record instanceof HeartbeatRecord) {
       return streamRecordStructFrom((HeartbeatRecord) record, useJsonFields);
     } else if (record instanceof ChildPartitionsRecord) {
@@ -194,7 +213,11 @@ public class TestStructMapper {
     return Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build();
   }
 
-  private static Struct streamRecordStructFrom(DataChangeRecord record, boolean useJsonFields) {
+  private static Struct streamRecordStructFrom(
+      DataChangeRecord record,
+      boolean useUnknownModType,
+      boolean useUnknownValueCaptureType,
+      boolean useJsonFields) {
     final Type dataChangeRecordType =
         useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE;
     return Struct.newBuilder()
@@ -202,7 +225,9 @@ public class TestStructMapper {
         .to(
             Value.structArray(
                 dataChangeRecordType,
-                Collections.singletonList(recordStructFrom(record, useJsonFields))))
+                Collections.singletonList(
+                    recordStructFrom(
+                        record, useUnknownModType, useUnknownValueCaptureType, useJsonFields))))
         .set("heartbeat_record")
         .to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
         .set("child_partitions_record")
@@ -210,21 +235,26 @@ public class TestStructMapper {
         .build();
   }
 
-  private static Struct recordStructFrom(DataChangeRecord record, boolean useJsonFields) {
-    final Type columnTypeType = useJsonFields ? COLUMN_TYPE_JSON_TYPE : COLUMN_TYPE_STRING_TYPE;
-    final Type modType = useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE;
+  private static Struct recordStructFrom(
+      DataChangeRecord record,
+      boolean useUnknownModType,
+      boolean useUnknownValueCaptureType,
+      boolean useJsonFields) {
     final Value columnTypes =
         Value.structArray(
-            columnTypeType,
+            useJsonFields ? COLUMN_TYPE_JSON_TYPE : COLUMN_TYPE_STRING_TYPE,
             record.getRowType().stream()
                 .map(rowType -> TestStructMapper.columnTypeStructFrom(rowType, useJsonFields))
                 .collect(Collectors.toList()));
     final Value mods =
         Value.structArray(
-            modType,
+            useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE,
             record.getMods().stream()
                 .map(mod -> TestStructMapper.modStructFrom(mod, useJsonFields))
                 .collect(Collectors.toList()));
+    final String modType = useUnknownModType ? "NEW_MOD_TYPE" : record.getModType().name();
+    final String valueCaptureType =
+        useUnknownValueCaptureType ? "NEW_VALUE_CAPTURE_TYPE" : record.getValueCaptureType().name();
     return Struct.newBuilder()
         .set("commit_timestamp")
         .to(record.getCommitTimestamp())
@@ -241,9 +271,9 @@ public class TestStructMapper {
         .set("mods")
         .to(mods)
         .set("mod_type")
-        .to(record.getModType().toString())
+        .to(modType)
         .set("value_capture_type")
-        .to(record.getValueCaptureType().toString())
+        .to(valueCaptureType)
         .set("number_of_records_in_transaction")
         .to(record.getNumberOfRecordsInTransaction())
         .set("number_of_partitions_in_transaction")