You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/29 02:41:29 UTC

[GitHub] [beam] nancyxu123 opened a new pull request, #24390: Apache beam spangres

nancyxu123 opened a new pull request, #24390:
URL: https://github.com/apache/beam/pull/24390

   Support Postgres databases for the SpannerIO change stream connector. The connector should be able to query when (1) both the metadata database and the change streams database are GSQL (2) when both the metadata database and the change streams database are Postgres (3) when the metadata database is in Postgres and the change streams database is in GSQL and (4) when the metadata database is in GSQL and the change streams database is in Postgres
   
   This PR first does an initial lookup to determine whether or not the change streams database and the metadata database are Postgres in SpannerIO. Then, it makes different queries to the metadata table and to the change stream depending on whether or not the database is postgres. It also creates the metadata table differently depending on whether the metadata database is Postgres. Finally, it maps received records into ChangeStreamRecords differently depending on whether or not the change stream is in a Postgres database or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] thiagotnunes commented on a diff in pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1035481624


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -83,30 +90,41 @@ public class ChangeStreamRecordMapper {
   private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
   private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
   private static final String TOKEN_COLUMN = "token";
+  private final Dialect dialect;
+  private final JsonFormat.Printer printer;
+  private final JsonFormat.Parser parser;
 
-  ChangeStreamRecordMapper() {}
+  ChangeStreamRecordMapper(Dialect dialect) {
+    this.dialect = dialect;
+
+    this.printer =

Review Comment:
   I think not, we create a new mapper in the [Setup](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java#L186-L187) method, which is called per SDF, so we (the implementers) won't call it concurrently, but I don't know if the backend will (Windmill in the case of Dataflow). I would think not.
   
   It would be good to test this under load to verify we don't get any errors



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] thiagotnunes commented on a diff in pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1035363580


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java:
##########
@@ -25,23 +25,30 @@
  */
 public class NameGenerator {
 
-  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
-      "CDC_Partitions_Metadata_%s_%s";
+  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s";
+  private static final int MAX_POSTGRES_TABLE_NAME_LENGTH = 63;

Review Comment:
   Since we only have one length, maybe drop the `POSTGRES`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java:
##########
@@ -98,11 +100,13 @@ public class PartitionMetadataAdminDao {
       DatabaseAdminClient databaseAdminClient,
       String instanceId,
       String databaseId,
-      String tableName) {
+      String tableName,
+      Dialect metadataDatabaseDialect) {

Review Comment:
   nit: since this class is named `PartitionMetadataAdminDao` I think we can name this field / variable just `dialect`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java:
##########
@@ -25,23 +25,30 @@
  */
 public class NameGenerator {
 
-  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
-      "CDC_Partitions_Metadata_%s_%s";
+  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s";
+  private static final int MAX_POSTGRES_TABLE_NAME_LENGTH = 63;
 
   /**
    * Generates an unique name for the partition metadata table in the form of {@code
-   * "CDC_Partitions_Metadata_<databaseId>_<uuid>"}.
+   * "Metadata_<databaseId>_<uuid>"}.
    *
    * @param databaseId The database id where the table will be created
    * @return the unique generated name of the partition metadata table
    */
   public static String generatePartitionMetadataTableName(String databaseId) {
     // Maximum Spanner table name length is 128 characters.
-    // There are 25 characters in the name format.
+    // There are 11 characters in the name format.
     // Maximum Spanner database ID length is 30 characters.
     // UUID always generates a String with 36 characters.
-    // 128 - (25 + 30 + 36) = 37 characters short of the limit
-    return String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, UUID.randomUUID())
-        .replaceAll("-", "_");
+    // For GoogleSQL, 128 - (11 + 30 + 36) = 51 characters short of the limit

Review Comment:
   Does this comment make sense still, now that we consolidated the size?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -83,30 +90,35 @@ public class ChangeStreamRecordMapper {
   private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
   private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
   private static final String TOKEN_COLUMN = "token";
+  private final Dialect spannerChangeStreamDatabaseDialect;
 
-  ChangeStreamRecordMapper() {}
+  ChangeStreamRecordMapper(Dialect spannerChangeStreamDatabaseDialect) {

Review Comment:
   nit: rename variable to `dialect`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -225,18 +246,50 @@ private Stream<ChangeStreamRecord> toChangeStreamRecord(
         Stream.concat(dataChangeRecords, heartbeatRecords), childPartitionsRecords);
   }
 
+  ChangeStreamRecord toChangeStreamRecordJson(
+      PartitionMetadata partition, String row, ChangeStreamResultSetMetadata resultSetMetadata) {
+    Value.Builder valueBuilder = Value.newBuilder();
+    try {
+      JsonFormat.parser().ignoringUnknownFields().merge(row, valueBuilder);

Review Comment:
   Do we need to create a new parser per record, or could we move this into the constructor of this class?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -53,16 +54,19 @@ public class PartitionMetadataDao {
 
   private final String metadataTableName;
   private final DatabaseClient databaseClient;
+  private final Dialect metadataDatabaseDialect;
 
   /**
    * Constructs a partition metadata dao object given the generated name of the tables.
    *
    * @param metadataTableName the name of the partition metadata table
    * @param databaseClient the {@link DatabaseClient} to perform queries
    */
-  PartitionMetadataDao(String metadataTableName, DatabaseClient databaseClient) {
+  PartitionMetadataDao(
+      String metadataTableName, DatabaseClient databaseClient, Dialect metadataDatabaseDialect) {
     this.metadataTableName = metadataTableName;
     this.databaseClient = databaseClient;
+    this.metadataDatabaseDialect = metadataDatabaseDialect;

Review Comment:
   nit: since this class is the `PartitionMetadataDao` I think we can name this variable / field just `dialect`.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -260,6 +312,65 @@ private DataChangeRecord toDataChangeRecord(
         changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
   }
 
+  private DataChangeRecord toDataChangeRecordJson(
+      PartitionMetadata partition, Value row, ChangeStreamResultSetMetadata resultSetMetadata) {
+    Value dataChangeRecordValue =
+        Optional.ofNullable(row.getStructValue().getFieldsMap().get(DATA_CHANGE_RECORD_COLUMN))
+            .orElseThrow(IllegalArgumentException::new);
+    Map<String, Value> valueMap = dataChangeRecordValue.getStructValue().getFieldsMap();
+    final String commitTimestamp =
+        Optional.ofNullable(valueMap.get(COMMIT_TIMESTAMP_COLUMN))
+            .orElseThrow(IllegalArgumentException::new)
+            .getStringValue();
+    return new DataChangeRecord(
+        partition.getPartitionToken(),
+        Timestamp.parseTimestamp(commitTimestamp),
+        Optional.ofNullable(valueMap.get(SERVER_TRANSACTION_ID_COLUMN))
+            .orElseThrow(IllegalArgumentException::new)

Review Comment:
   Thanks



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java:
##########
@@ -50,11 +52,13 @@ public class ChangeStreamDao {
       String changeStreamName,
       DatabaseClient databaseClient,
       RpcPriority rpcPriority,
-      String jobName) {
+      String jobName,
+      Dialect spannerChangeStreamDatabaseDialect) {

Review Comment:
   nit: since this is the `ChangeStreamDao` I think we can name this variable / field just `dialect`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java:
##########
@@ -190,4 +237,8 @@ public void deletePartitionMetadataTable() {
       throw SpannerExceptionFactory.propagateInterrupt(e);
     }
   }
+
+  private boolean isPostgres() {

Review Comment:
   nit: should we have a similar method on the `ChangeStreamDao`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1036333059


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java:
##########
@@ -25,23 +25,28 @@
  */
 public class NameGenerator {
 
-  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
-      "CDC_Partitions_Metadata_%s_%s";
+  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s";

Review Comment:
   does this make the IO incompatible for pipeline updates? (maybe it already was?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nancyxu123 commented on a diff in pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
nancyxu123 commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1035479345


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -83,30 +90,41 @@ public class ChangeStreamRecordMapper {
   private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
   private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
   private static final String TOKEN_COLUMN = "token";
+  private final Dialect dialect;
+  private final JsonFormat.Printer printer;
+  private final JsonFormat.Parser parser;
 
-  ChangeStreamRecordMapper() {}
+  ChangeStreamRecordMapper(Dialect dialect) {
+    this.dialect = dialect;
+
+    this.printer =

Review Comment:
   I don't really see any documentation explicitly specifying this class as thread-safe, unfortunately. However, like you said, it doesn't seem like these objects will be accessed concurrently?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nancyxu123 commented on a diff in pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
nancyxu123 commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1035198093


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -260,6 +312,65 @@ private DataChangeRecord toDataChangeRecord(
         changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
   }
 
+  private DataChangeRecord toDataChangeRecordJson(
+      PartitionMetadata partition, Value row, ChangeStreamResultSetMetadata resultSetMetadata) {
+    Value dataChangeRecordValue =
+        Optional.ofNullable(row.getStructValue().getFieldsMap().get(DATA_CHANGE_RECORD_COLUMN))
+            .orElseThrow(IllegalArgumentException::new);
+    Map<String, Value> valueMap = dataChangeRecordValue.getStructValue().getFieldsMap();
+    final String commitTimestamp =
+        Optional.ofNullable(valueMap.get(COMMIT_TIMESTAMP_COLUMN))
+            .orElseThrow(IllegalArgumentException::new)
+            .getStringValue();
+    return new DataChangeRecord(
+        partition.getPartitionToken(),
+        Timestamp.parseTimestamp(commitTimestamp),
+        Optional.ofNullable(valueMap.get(SERVER_TRANSACTION_ID_COLUMN))
+            .orElseThrow(IllegalArgumentException::new)

Review Comment:
   Yes, I doublechecked.
   
   See JSON string for data change record:     Jsonb: {"data_change_record":{"column_types":[{"is_primary_key":true,"name":"SingerId","ordinal_position":1,"type":{"code":"INT64"}},{"is_primary_key":false,"name":"FirstName","ordinal_position":2,"type":{"code":"STRING"}},{"is_primary_key":false,"name":"LastName","ordinal_position":3,"type":{"code":"STRING"}},{"is_primary_key":false,"name":"SingerInfo","ordinal_position":4,"type":{"code":"BYTES"}}],"commit_timestamp":"2022-11-29T11:31:36.008927-08:00","is_last_record_in_transaction_in_partition":true,"is_system_transaction":false,"mod_type":"INSERT","mods":[{"keys":{"SingerId":"2"},"new_values":{"FirstName":"First Name 2","LastName":"Last Name 2","SingerInfo":null},"old_values":{}}],"number_of_partitions_in_transaction":1,"number_of_records_in_transaction":1,"record_sequence":"00000000","server_transaction_id":"NTczMzU4MTQ3MzQyNzAyNjA0Ng==","table_name":"Singers_qqkl32uoQ1Rhk4ZDqW4KGy2f1oGyhLH6oiiXq71B5U24ZLbTylFANYz","transaction_tag":
 "","value_capture_type":"OLD_AND_NEW_VALUES"}}
   
   
   See JSON string for child partitions record:     Jsonb: {"child_partitions_record":{"child_partitions":[{"parent_partition_tokens":[],"token":"__8BAYEGX9zUgAABgsBFg1NpbmdlcnNTdHJlYW1fODVxa29vZHBkM3ZOQUFvOAABhIEGCRQ28AANgoCDCMNkAAAAAAAAhASSRgwFhWcxM185NTE5OTgzAAH__4X_Be6gldXMFYb_Be6hGsHih4eAwGQBAf__"}],"record_sequence":"00000002","start_timestamp":"2022-11-29T11:31:35.951359-08:00"}}
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #24390:
URL: https://github.com/apache/beam/pull/24390#issuecomment-1332599345

   lgtm - I can merge once tests pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24390:
URL: https://github.com/apache/beam/pull/24390#issuecomment-1330032723

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @lukecwik for label java.
   R: @Abacn for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #24390:
URL: https://github.com/apache/beam/pull/24390#issuecomment-1333279008

   LGTM thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem merged pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
pabloem merged PR #24390:
URL: https://github.com/apache/beam/pull/24390


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #24390:
URL: https://github.com/apache/beam/pull/24390#issuecomment-1332606080

   Run Java_Examples_Dataflow_Java17 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #24390:
URL: https://github.com/apache/beam/pull/24390#issuecomment-1332667027

   fyi @kennknowles we're working with @nancyxu123 to have this in for 2.44.0 - we'd love your help to coordinate with us on cutting after this merge or coordinating with us to do a cherry pick


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] thiagotnunes commented on a diff in pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1035472413


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -83,30 +90,41 @@ public class ChangeStreamRecordMapper {
   private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
   private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
   private static final String TOKEN_COLUMN = "token";
+  private final Dialect dialect;
+  private final JsonFormat.Printer printer;
+  private final JsonFormat.Parser parser;
 
-  ChangeStreamRecordMapper() {}
+  ChangeStreamRecordMapper(Dialect dialect) {
+    this.dialect = dialect;
+
+    this.printer =

Review Comment:
   I don't think we will call these concurrently, but do you know if they are thread-safe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1036334262


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java:
##########
@@ -25,23 +25,28 @@
  */
 public class NameGenerator {
 
-  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
-      "CDC_Partitions_Metadata_%s_%s";
+  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s";
+  private static final int MAX_TABLE_NAME_LENGTH = 63;
 
   /**
    * Generates an unique name for the partition metadata table in the form of {@code
-   * "CDC_Partitions_Metadata_<databaseId>_<uuid>"}.
+   * "Metadata_<databaseId>_<uuid>"}.
    *
    * @param databaseId The database id where the table will be created
    * @return the unique generated name of the partition metadata table
    */
   public static String generatePartitionMetadataTableName(String databaseId) {
-    // Maximum Spanner table name length is 128 characters.
-    // There are 25 characters in the name format.
+    // There are 11 characters in the name format.
     // Maximum Spanner database ID length is 30 characters.
     // UUID always generates a String with 36 characters.
-    // 128 - (25 + 30 + 36) = 37 characters short of the limit
-    return String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, UUID.randomUUID())
-        .replaceAll("-", "_");
+    // Since the Postgres table name length is 63, we may need to truncate the table name depending
+    // on the database length.
+    String fullString =
+        String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, UUID.randomUUID())
+            .replaceAll("-", "_");
+    if (fullString.length() < MAX_TABLE_NAME_LENGTH) {
+      return fullString;
+    }
+    return fullString.substring(0, MAX_TABLE_NAME_LENGTH);

Review Comment:
   I am guessing this is not necessarily a problem, since we have very few instances of metadata tables being created, but cutting UUID can cause issues sooner (https://stackoverflow.com/questions/4564112/is-it-safe-to-turn-a-uuid-into-a-short-code-only-use-first-8-chars) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] thiagotnunes commented on a diff in pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

Posted by GitBox <gi...@apache.org>.
thiagotnunes commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1034257594


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1696,6 +1706,27 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
     }
   }
 
+  private static boolean isPostgres(SpannerConfig spannerConfig) {

Review Comment:
   You can use `databaseClient.getDialect()` and remove this method



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1646,7 +1654,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
               partitionMetadataSpannerConfig,
               partitionMetadataTableName,
               rpcPriority,
-              input.getPipeline().getOptions().getJobName());
+              input.getPipeline().getOptions().getJobName(),
+              isSpannerChangeStreamDatabasePostgres,
+              isSpannerMetadataDatabasePostgres);

Review Comment:
   Same as above



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java:
##########
@@ -32,6 +32,11 @@ public class MapperFactory implements Serializable {
 
   private transient ChangeStreamRecordMapper changeStreamRecordMapperInstance;
   private transient PartitionMetadataMapper partitionMetadataMapperInstance;
+  private final boolean isPostgres;
+
+  public MapperFactory(boolean isPostgres) {

Review Comment:
   Could we use a `Dialect` instead of a `boolean`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1636,7 +1644,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
           getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
               ? MAX_INCLUSIVE_END_AT
               : getInclusiveEndAt();
-      final MapperFactory mapperFactory = new MapperFactory();
+      final MapperFactory mapperFactory = new MapperFactory(isSpannerChangeStreamDatabasePostgres);

Review Comment:
   I think you should give it a [Dialect](https://github.com/googleapis/java-spanner/blob/main/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Dialect.java), which makes the class more extensible.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java:
##########
@@ -50,11 +51,13 @@ public class ChangeStreamDao {
       String changeStreamName,
       DatabaseClient databaseClient,
       RpcPriority rpcPriority,
-      String jobName) {
+      String jobName,
+      boolean isPostgres) {

Review Comment:
   Could we use a Dialect instead of a boolean?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -53,16 +53,19 @@ public class PartitionMetadataDao {
 
   private final String metadataTableName;
   private final DatabaseClient databaseClient;
+  private final boolean isPostgres;
 
   /**
    * Constructs a partition metadata dao object given the generated name of the tables.
    *
    * @param metadataTableName the name of the partition metadata table
    * @param databaseClient the {@link DatabaseClient} to perform queries
    */
-  PartitionMetadataDao(String metadataTableName, DatabaseClient databaseClient) {
+  PartitionMetadataDao(
+      String metadataTableName, DatabaseClient databaseClient, boolean isPostgres) {
     this.metadataTableName = metadataTableName;
     this.databaseClient = databaseClient;
+    this.isPostgres = isPostgres;

Review Comment:
   Could we use a `Dialect` instead of a `boolean`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java:
##########
@@ -98,11 +99,13 @@ public class PartitionMetadataAdminDao {
       DatabaseAdminClient databaseAdminClient,
       String instanceId,
       String databaseId,
-      String tableName) {
+      String tableName,
+      boolean isPostgres) {

Review Comment:
   Could we use a `Dialect` instead of a `boolean`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1628,6 +1627,15 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
               .setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId))
               .setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId))
               .build();
+      final boolean isSpannerChangeStreamDatabasePostgres = isPostgres(changeStreamSpannerConfig);
+      final boolean isSpannerMetadataDatabasePostgres = isPostgres(partitionMetadataSpannerConfig);
+      LOG.info("The Spanner database is postgres: " + isSpannerChangeStreamDatabasePostgres);

Review Comment:
   Maybe log the database id + dialect here and below?
   
   ```
   DatabaseId changeStreamDatabaseId = DatabaseId.of(changeStreamConfig.getProjectId(), changeStreamConfig.getInstanceId(), changeStreamConfig.getDatabaseId);
   Dialect changeStreamDatabaseDialect = getDialect(changeStreamSpannerConfig) ? 
   LOG.info("The Spanner database " + changeStreamDatabaseId + " has dialect " + changeStreamDatabaseDialect);
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1646,7 +1654,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
               partitionMetadataSpannerConfig,
               partitionMetadataTableName,
               rpcPriority,
-              input.getPipeline().getOptions().getJobName());
+              input.getPipeline().getOptions().getJobName(),
+              isSpannerChangeStreamDatabasePostgres,

Review Comment:
   I think you should it a [Dialect](https://github.com/googleapis/java-spanner/blob/main/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Dialect.java), which makes the class more extensible.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -83,30 +89,35 @@ public class ChangeStreamRecordMapper {
   private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
   private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
   private static final String TOKEN_COLUMN = "token";
+  private final boolean isPostgres;
 
-  ChangeStreamRecordMapper() {}
+  ChangeStreamRecordMapper(boolean isPostgres) {

Review Comment:
   Could we use a `Dialect` instead of a `boolean`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -197,13 +209,21 @@ public class ChangeStreamRecordMapper {
    * @return a {@link List} of {@link ChangeStreamRecord} subclasses
    */
   public List<ChangeStreamRecord> toChangeStreamRecords(
-      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
-    return row.getStructList(0).stream()
+      PartitionMetadata partition,
+      ChangeStreamResultSet row,

Review Comment:
   Since we are calling this method with a `ChangeStreamResultSet` now, this is no longer a row, but a `resultSet`. We make it a row by either calling `resultSet.getPgJsonb(0)` or `resultSet.getCurrentRowAsStruct()`. Could you change the variable name here?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java:
##########
@@ -25,23 +25,32 @@
  */
 public class NameGenerator {
 
-  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
-      "CDC_Partitions_Metadata_%s_%s";
+  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s";
+  private static final int MAX_POSTGRES_TABLE_NAME_LENGTH = 63;
+  private static final int MAX_TABLE_NAME_LENGTH = 128;
 
   /**
    * Generates an unique name for the partition metadata table in the form of {@code
-   * "CDC_Partitions_Metadata_<databaseId>_<uuid>"}.
+   * "Metadata_<databaseId>_<uuid>"}.
    *
    * @param databaseId The database id where the table will be created
    * @return the unique generated name of the partition metadata table
    */
-  public static String generatePartitionMetadataTableName(String databaseId) {
+  public static String generatePartitionMetadataTableName(String databaseId, boolean isPostgres) {
     // Maximum Spanner table name length is 128 characters.
-    // There are 25 characters in the name format.
+    // There are 11 characters in the name format.
     // Maximum Spanner database ID length is 30 characters.
     // UUID always generates a String with 36 characters.
-    // 128 - (25 + 30 + 36) = 37 characters short of the limit
-    return String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, UUID.randomUUID())
-        .replaceAll("-", "_");
+    // For GoogleSQL, 128 - (11 + 30 + 36) = 51 characters short of the limit
+    // For Postgres, since the limit is 64, we may need to truncate the table name depending
+    // on the database length.
+    int maxTableNameLength = isPostgres ? MAX_POSTGRES_TABLE_NAME_LENGTH : MAX_TABLE_NAME_LENGTH;

Review Comment:
   Since GoogleSQL char limit is lesser than PG, could we always use that instead (to avoid passing in a dialect here?). If you feel strongly that we should use the max for PG, could you give this method a Dialect instead of a boolean?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -260,6 +312,65 @@ private DataChangeRecord toDataChangeRecord(
         changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
   }
 
+  private DataChangeRecord toDataChangeRecordJson(
+      PartitionMetadata partition, Value row, ChangeStreamResultSetMetadata resultSetMetadata) {
+    Value dataChangeRecordValue =
+        Optional.ofNullable(row.getStructValue().getFieldsMap().get(DATA_CHANGE_RECORD_COLUMN))
+            .orElseThrow(IllegalArgumentException::new);
+    Map<String, Value> valueMap = dataChangeRecordValue.getStructValue().getFieldsMap();
+    final String commitTimestamp =
+        Optional.ofNullable(valueMap.get(COMMIT_TIMESTAMP_COLUMN))
+            .orElseThrow(IllegalArgumentException::new)
+            .getStringValue();
+    return new DataChangeRecord(
+        partition.getPartitionToken(),
+        Timestamp.parseTimestamp(commitTimestamp),
+        Optional.ofNullable(valueMap.get(SERVER_TRANSACTION_ID_COLUMN))
+            .orElseThrow(IllegalArgumentException::new)

Review Comment:
   All the fields that we are doing `orElseThrow` are mandatory right (just double checking)?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java:
##########
@@ -63,7 +65,9 @@ public DaoFactory(
       SpannerConfig metadataSpannerConfig,
       String partitionMetadataTableName,
       RpcPriority rpcPriority,
-      String jobName) {
+      String jobName,
+      boolean isSpannerConfigPostgres,

Review Comment:
   Could we use a `Dialect` instead of booleans?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org