You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/01 21:46:21 UTC
[beam] branch master updated: feat: allow for unknown values in change streams (#17655)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 23aeca4e373 feat: allow for unknown values in change streams (#17655)
23aeca4e373 is described below
commit 23aeca4e373ff5a4de176770d52f15a37653a32e
Author: Thiago Nunes <th...@google.com>
AuthorDate: Wed Jun 1 14:46:15 2022 -0700
feat: allow for unknown values in change streams (#17655)
Allows for unknown values to be given in the mod type and value capture
type. This way, customers won't have to force update the connector when
the backend adds a new value here.
Co-authored-by: Pablo <pa...@users.noreply.github.com>
---
.../mapper/ChangeStreamRecordMapper.java | 22 +++++++-
.../gcp/spanner/changestreams/model/ModType.java | 3 +-
.../changestreams/model/ValueCaptureType.java | 1 +
.../mapper/ChangeStreamRecordMapperTest.java | 28 ++++++++++
.../changestreams/util/TestStructMapper.java | 60 ++++++++++++++++------
5 files changed, 96 insertions(+), 18 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
index e2bae67d10f..0f6ecc0e7cd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -249,8 +249,8 @@ public class ChangeStreamRecordMapper {
.map(this::columnTypeFrom)
.collect(Collectors.toList()),
row.getStructList(MODS_COLUMN).stream().map(this::modFrom).collect(Collectors.toList()),
- ModType.valueOf(row.getString(MOD_TYPE_COLUMN)),
- ValueCaptureType.valueOf(row.getString(VALUE_CAPTURE_TYPE_COLUMN)),
+ modTypeFrom(row.getString(MOD_TYPE_COLUMN)),
+ valueCaptureTypeFrom(row.getString(VALUE_CAPTURE_TYPE_COLUMN)),
row.getLong(NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN),
row.getLong(NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN),
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
@@ -299,6 +299,24 @@ public class ChangeStreamRecordMapper {
return new Mod(keys, oldValues, newValues);
}
+ private ModType modTypeFrom(String name) {
+ try {
+ return ModType.valueOf(name);
+ } catch (IllegalArgumentException e) {
+ // This is not logged to prevent flooding users with messages
+ return ModType.UNKNOWN;
+ }
+ }
+
+ private ValueCaptureType valueCaptureTypeFrom(String name) {
+ try {
+ return ValueCaptureType.valueOf(name);
+ } catch (IllegalArgumentException e) {
+ // This is not logged to prevent flooding users with messages
+ return ValueCaptureType.UNKNOWN;
+ }
+ }
+
private ChildPartition childPartitionFrom(String partitionToken, Struct struct) {
final HashSet<String> parentTokens =
Sets.newHashSet(struct.getStringList(PARENT_PARTITION_TOKENS_COLUMN));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModType.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModType.java
index 4929831fba0..e56dd0af6d6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModType.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModType.java
@@ -28,5 +28,6 @@ import org.apache.beam.sdk.coders.DefaultCoder;
public enum ModType {
INSERT,
UPDATE,
- DELETE
+ DELETE,
+ UNKNOWN
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
index 1eeb830d3c2..80bea2aa47f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ValueCaptureType.java
@@ -28,4 +28,5 @@ import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(AvroCoder.class)
public enum ValueCaptureType {
OLD_AND_NEW_VALUES,
+ UNKNOWN
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
index 50f42927e58..5fd05e61c1b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings;
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsWithUnknownModTypeAndValueCaptureType;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -172,6 +173,33 @@ public class ChangeStreamRecordMapperTest {
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}
+ @Test
+ public void testMappingStructRowWithUnknownModTypeAndValueCaptureTypeToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "transactionId",
+ false,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(
+ new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
+ ModType.UNKNOWN,
+ ValueCaptureType.UNKNOWN,
+ 10L,
+ 2L,
+ null);
+ final Struct struct = recordsWithUnknownModTypeAndValueCaptureType(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+ }
+
@Test
public void testMappingStructRowToHeartbeatRecord() {
final HeartbeatRecord heartbeatRecord =
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
index a3a434c6eed..98ee02216d2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
@@ -109,15 +109,23 @@ public class TestStructMapper {
StructField.of("child_partitions_record", Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
public static Struct recordsToStructWithJson(ChangeStreamRecord... records) {
- return recordsToStruct(true, records);
+ return recordsToStruct(false, false, true, records);
+ }
+
+ public static Struct recordsWithUnknownModTypeAndValueCaptureType(ChangeStreamRecord... records) {
+ return recordsToStruct(true, true, true, records);
}
// TODO: Remove when backend is fully migrated to JSON
public static Struct recordsToStructWithStrings(ChangeStreamRecord... records) {
- return recordsToStruct(false, records);
+ return recordsToStruct(false, false, false, records);
}
- private static Struct recordsToStruct(boolean useJsonFields, ChangeStreamRecord... records) {
+ private static Struct recordsToStruct(
+ boolean useUnknownModType,
+ boolean useUnknownValueCaptureType,
+ boolean useJsonFields,
+ ChangeStreamRecord... records) {
final Type streamRecordType =
useJsonFields ? STREAM_RECORD_JSON_TYPE : STREAM_RECORD_STRING_TYPE;
return Struct.newBuilder()
@@ -125,14 +133,25 @@ public class TestStructMapper {
Value.structArray(
streamRecordType,
Arrays.stream(records)
- .map(record -> TestStructMapper.streamRecordStructFrom(record, useJsonFields))
+ .map(
+ record ->
+ TestStructMapper.streamRecordStructFrom(
+ record,
+ useUnknownModType,
+ useUnknownValueCaptureType,
+ useJsonFields))
.collect(Collectors.toList())))
.build();
}
- private static Struct streamRecordStructFrom(ChangeStreamRecord record, boolean useJsonFields) {
+ private static Struct streamRecordStructFrom(
+ ChangeStreamRecord record,
+ boolean useUnknownModType,
+ boolean useUnknownValueCaptureType,
+ boolean useJsonFields) {
if (record instanceof DataChangeRecord) {
- return streamRecordStructFrom((DataChangeRecord) record, useJsonFields);
+ return streamRecordStructFrom(
+ (DataChangeRecord) record, useUnknownModType, useUnknownValueCaptureType, useJsonFields);
} else if (record instanceof HeartbeatRecord) {
return streamRecordStructFrom((HeartbeatRecord) record, useJsonFields);
} else if (record instanceof ChildPartitionsRecord) {
@@ -194,7 +213,11 @@ public class TestStructMapper {
return Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build();
}
- private static Struct streamRecordStructFrom(DataChangeRecord record, boolean useJsonFields) {
+ private static Struct streamRecordStructFrom(
+ DataChangeRecord record,
+ boolean useUnknownModType,
+ boolean useUnknownValueCaptureType,
+ boolean useJsonFields) {
final Type dataChangeRecordType =
useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
@@ -202,7 +225,9 @@ public class TestStructMapper {
.to(
Value.structArray(
dataChangeRecordType,
- Collections.singletonList(recordStructFrom(record, useJsonFields))))
+ Collections.singletonList(
+ recordStructFrom(
+ record, useUnknownModType, useUnknownValueCaptureType, useJsonFields))))
.set("heartbeat_record")
.to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
.set("child_partitions_record")
@@ -210,21 +235,26 @@ public class TestStructMapper {
.build();
}
- private static Struct recordStructFrom(DataChangeRecord record, boolean useJsonFields) {
- final Type columnTypeType = useJsonFields ? COLUMN_TYPE_JSON_TYPE : COLUMN_TYPE_STRING_TYPE;
- final Type modType = useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE;
+ private static Struct recordStructFrom(
+ DataChangeRecord record,
+ boolean useUnknownModType,
+ boolean useUnknownValueCaptureType,
+ boolean useJsonFields) {
final Value columnTypes =
Value.structArray(
- columnTypeType,
+ useJsonFields ? COLUMN_TYPE_JSON_TYPE : COLUMN_TYPE_STRING_TYPE,
record.getRowType().stream()
.map(rowType -> TestStructMapper.columnTypeStructFrom(rowType, useJsonFields))
.collect(Collectors.toList()));
final Value mods =
Value.structArray(
- modType,
+ useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE,
record.getMods().stream()
.map(mod -> TestStructMapper.modStructFrom(mod, useJsonFields))
.collect(Collectors.toList()));
+ final String modType = useUnknownModType ? "NEW_MOD_TYPE" : record.getModType().name();
+ final String valueCaptureType =
+ useUnknownValueCaptureType ? "NEW_VALUE_CAPTURE_TYPE" : record.getValueCaptureType().name();
return Struct.newBuilder()
.set("commit_timestamp")
.to(record.getCommitTimestamp())
@@ -241,9 +271,9 @@ public class TestStructMapper {
.set("mods")
.to(mods)
.set("mod_type")
- .to(record.getModType().toString())
+ .to(modType)
.set("value_capture_type")
- .to(record.getValueCaptureType().toString())
+ .to(valueCaptureType)
.set("number_of_records_in_transaction")
.to(record.getNumberOfRecordsInTransaction())
.set("number_of_partitions_in_transaction")