You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/09/07 14:32:50 UTC

[beam] branch master updated: [BEAM-12164] Support new value capture types NEW_ROW NEW_VALUES for s… (#23053)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a577639b215 [BEAM-12164] Support new value capture types NEW_ROW NEW_VALUES for s… (#23053)
a577639b215 is described below

commit a577639b215e001cfccf1af47970b9b18ce14087
Author: ChangyuLi28 <35...@users.noreply.github.com>
AuthorDate: Wed Sep 7 07:32:43 2022 -0700

    [BEAM-12164] Support new value capture types NEW_ROW NEW_VALUES for s… (#23053)
    
    * [BEAM-12164] Support new value capture types NEW_ROW NEW_VALUES for spannerIO connector and add unit tests
    
    * fixed failing test
    
    Co-authored-by: Changyu Li <ch...@google.com>
---
 .../changestreams/model/ValueCaptureType.java      |   2 +
 .../mapper/ChangeStreamRecordMapperTest.java       | 204 +++++++++++++++++++++
 2 files changed, 206 insertions(+)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
index 80bea2aa47f..6a55bdc72d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
@@ -27,6 +27,8 @@ import org.apache.beam.sdk.coders.DefaultCoder;
  */
 @DefaultCoder(AvroCoder.class)
 public enum ValueCaptureType {
+  NEW_ROW,
+  NEW_VALUES,
   OLD_AND_NEW_VALUES,
   UNKNOWN
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
index bfe651021e3..9ebb89bb90b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -113,6 +113,80 @@ public class ChangeStreamRecordMapperTest {
         mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
   }
 
+  /*
+   * Change streams with NEW_ROW value capture type do not track old values, so null value
+   * is used for OLD_VALUES_COLUMN in Mod.
+   */
+  @Test
+  public void testMappingUpdateStructRowNewRowToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "serverTransactionId",
+            true,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(
+                new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+            ModType.UPDATE,
+            ValueCaptureType.NEW_ROW,
+            10L,
+            2L,
+            "transactionTag",
+            true,
+            null);
+    final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+    final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+  }
+
+  /*
+   * Change streams with NEW_VALUES value capture type do not track old values, so null value
+   * is used for OLD_VALUES_COLUMN in Mod.
+   */
+  @Test
+  public void testMappingUpdateStructRowNewValuesToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "serverTransactionId",
+            true,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(
+                new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+            ModType.UPDATE,
+            ValueCaptureType.NEW_VALUES,
+            10L,
+            2L,
+            "transactionTag",
+            true,
+            null);
+    final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+    final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+  }
+
   @Test
   public void testMappingInsertStructRowToDataChangeRecord() {
     final DataChangeRecord dataChangeRecord =
@@ -146,6 +220,72 @@ public class ChangeStreamRecordMapperTest {
         mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
   }
 
+  @Test
+  public void testMappingInsertStructRowNewRowToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "transactionId",
+            false,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(
+                new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+            ModType.INSERT,
+            ValueCaptureType.NEW_ROW,
+            10L,
+            2L,
+            "transactionTag",
+            true,
+            null);
+    final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+    final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingInsertStructRowNewValuesToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "transactionId",
+            false,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(
+                new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+            ModType.INSERT,
+            ValueCaptureType.NEW_VALUES,
+            10L,
+            2L,
+            "transactionTag",
+            true,
+            null);
+    final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+    final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+  }
+
   @Test
   public void testMappingDeleteStructRowToDataChangeRecord() {
     final DataChangeRecord dataChangeRecord =
@@ -179,6 +319,70 @@ public class ChangeStreamRecordMapperTest {
         mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
   }
 
+  @Test
+  public void testMappingDeleteStructRowNewRowToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "transactionId",
+            false,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(new Mod("{\"column1\": \"value1\"}", null, null)),
+            ModType.DELETE,
+            ValueCaptureType.NEW_ROW,
+            10L,
+            2L,
+            "transactionTag",
+            true,
+            null);
+    final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+    final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingDeleteStructRowNewValuesToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "transactionId",
+            false,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(new Mod("{\"column1\": \"value1\"}", null, null)),
+            ModType.DELETE,
+            ValueCaptureType.NEW_VALUES,
+            10L,
+            2L,
+            "transactionTag",
+            true,
+            null);
+    final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+    final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+  }
+
   @Test
   public void testMappingStructRowWithUnknownModTypeAndValueCaptureTypeToDataChangeRecord() {
     final DataChangeRecord dataChangeRecord =