You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/09/07 14:32:50 UTC
[beam] branch master updated: [BEAM-12164] Support new value capture types NEW_ROW NEW_VALUES for s… (#23053)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a577639b215 [BEAM-12164] Support new value capture types NEW_ROW NEW_VALUES for s… (#23053)
a577639b215 is described below
commit a577639b215e001cfccf1af47970b9b18ce14087
Author: ChangyuLi28 <35...@users.noreply.github.com>
AuthorDate: Wed Sep 7 07:32:43 2022 -0700
[BEAM-12164] Support new value capture types NEW_ROW NEW_VALUES for s… (#23053)
* [BEAM-12164] Support new value capture types NEW_ROW NEW_VALUES for spannerIO connector and add unit tests
* fixed failing test
Co-authored-by: Changyu Li <ch...@google.com>
---
.../changestreams/model/ValueCaptureType.java | 2 +
.../mapper/ChangeStreamRecordMapperTest.java | 204 +++++++++++++++++++++
2 files changed, 206 insertions(+)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
index 80bea2aa47f..6a55bdc72d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
@@ -27,6 +27,8 @@ import org.apache.beam.sdk.coders.DefaultCoder;
*/
@DefaultCoder(AvroCoder.class)
public enum ValueCaptureType {
+ NEW_ROW,
+ NEW_VALUES,
OLD_AND_NEW_VALUES,
UNKNOWN
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
index bfe651021e3..9ebb89bb90b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -113,6 +113,80 @@ public class ChangeStreamRecordMapperTest {
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}
+ /*
+ * Change streams with NEW_ROW value capture type do not track old values, so null value
+ * is used for OLD_VALUES_COLUMN in Mod.
+ */
+ @Test
+ public void testMappingUpdateStructRowNewRowToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "serverTransactionId",
+ true,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(
+ new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+ ModType.UPDATE,
+ ValueCaptureType.NEW_ROW,
+ 10L,
+ 2L,
+ "transactionTag",
+ true,
+ null);
+ final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+ final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+ }
+
+ /*
+ * Change streams with NEW_VALUES value capture type do not track old values, so null value
+ * is used for OLD_VALUES_COLUMN in Mod.
+ */
+ @Test
+ public void testMappingUpdateStructRowNewValuesToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "serverTransactionId",
+ true,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(
+ new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+ ModType.UPDATE,
+ ValueCaptureType.NEW_VALUES,
+ 10L,
+ 2L,
+ "transactionTag",
+ true,
+ null);
+ final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+ final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+ }
+
@Test
public void testMappingInsertStructRowToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =
@@ -146,6 +220,72 @@ public class ChangeStreamRecordMapperTest {
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}
+ @Test
+ public void testMappingInsertStructRowNewRowToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "transactionId",
+ false,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(
+ new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+ ModType.INSERT,
+ ValueCaptureType.NEW_ROW,
+ 10L,
+ 2L,
+ "transactionTag",
+ true,
+ null);
+ final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+ final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingInsertStructRowNewValuesToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "transactionId",
+ false,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(
+ new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+ ModType.INSERT,
+ ValueCaptureType.NEW_VALUES,
+ 10L,
+ 2L,
+ "transactionTag",
+ true,
+ null);
+ final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+ final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+ }
+
@Test
public void testMappingDeleteStructRowToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =
@@ -179,6 +319,70 @@ public class ChangeStreamRecordMapperTest {
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}
+ @Test
+ public void testMappingDeleteStructRowNewRowToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "transactionId",
+ false,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(new Mod("{\"column1\": \"value1\"}", null, null)),
+ ModType.DELETE,
+ ValueCaptureType.NEW_ROW,
+ 10L,
+ 2L,
+ "transactionTag",
+ true,
+ null);
+ final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+ final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingDeleteStructRowNewValuesToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "transactionId",
+ false,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(new Mod("{\"column1\": \"value1\"}", null, null)),
+ ModType.DELETE,
+ ValueCaptureType.NEW_VALUES,
+ 10L,
+ 2L,
+ "transactionTag",
+ true,
+ null);
+ final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
+ final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
+ }
+
@Test
public void testMappingStructRowWithUnknownModTypeAndValueCaptureTypeToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =