You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/29 03:23:34 UTC

[GitHub] [beam] thiagotnunes commented on a diff in pull request #24390: [BEAM-12164] Support querying against Postgres for the SpannerIO change streams connector

thiagotnunes commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1034257594


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1696,6 +1706,27 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
     }
   }
 
+  private static boolean isPostgres(SpannerConfig spannerConfig) {

Review Comment:
   You can use `databaseClient.getDialect()` and remove this method



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1646,7 +1654,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
               partitionMetadataSpannerConfig,
               partitionMetadataTableName,
               rpcPriority,
-              input.getPipeline().getOptions().getJobName());
+              input.getPipeline().getOptions().getJobName(),
+              isSpannerChangeStreamDatabasePostgres,
+              isSpannerMetadataDatabasePostgres);

Review Comment:
   Same as above



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java:
##########
@@ -32,6 +32,11 @@ public class MapperFactory implements Serializable {
 
   private transient ChangeStreamRecordMapper changeStreamRecordMapperInstance;
   private transient PartitionMetadataMapper partitionMetadataMapperInstance;
+  private final boolean isPostgres;
+
+  public MapperFactory(boolean isPostgres) {

Review Comment:
   Could we use a `Dialect` instead of a `boolean`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1636,7 +1644,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
           getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
               ? MAX_INCLUSIVE_END_AT
               : getInclusiveEndAt();
-      final MapperFactory mapperFactory = new MapperFactory();
+      final MapperFactory mapperFactory = new MapperFactory(isSpannerChangeStreamDatabasePostgres);

Review Comment:
   I think you should give it a [Dialect](https://github.com/googleapis/java-spanner/blob/main/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Dialect.java), which makes the class more extensible.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java:
##########
@@ -50,11 +51,13 @@ public class ChangeStreamDao {
       String changeStreamName,
       DatabaseClient databaseClient,
       RpcPriority rpcPriority,
-      String jobName) {
+      String jobName,
+      boolean isPostgres) {

Review Comment:
   Could we use a Dialect instead of a boolean?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -53,16 +53,19 @@ public class PartitionMetadataDao {
 
   private final String metadataTableName;
   private final DatabaseClient databaseClient;
+  private final boolean isPostgres;
 
   /**
    * Constructs a partition metadata dao object given the generated name of the tables.
    *
    * @param metadataTableName the name of the partition metadata table
    * @param databaseClient the {@link DatabaseClient} to perform queries
    */
-  PartitionMetadataDao(String metadataTableName, DatabaseClient databaseClient) {
+  PartitionMetadataDao(
+      String metadataTableName, DatabaseClient databaseClient, boolean isPostgres) {
     this.metadataTableName = metadataTableName;
     this.databaseClient = databaseClient;
+    this.isPostgres = isPostgres;

Review Comment:
   Could we use a `Dialect` instead of a `boolean`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java:
##########
@@ -98,11 +99,13 @@ public class PartitionMetadataAdminDao {
       DatabaseAdminClient databaseAdminClient,
       String instanceId,
       String databaseId,
-      String tableName) {
+      String tableName,
+      boolean isPostgres) {

Review Comment:
   Could we use a `Dialect` instead of a `boolean`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1628,6 +1627,15 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
               .setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId))
               .setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId))
               .build();
+      final boolean isSpannerChangeStreamDatabasePostgres = isPostgres(changeStreamSpannerConfig);
+      final boolean isSpannerMetadataDatabasePostgres = isPostgres(partitionMetadataSpannerConfig);
+      LOG.info("The Spanner database is postgres: " + isSpannerChangeStreamDatabasePostgres);

Review Comment:
   Maybe log the database id + dialect here and below?
   
   ```
   DatabaseId changeStreamDatabaseId = DatabaseId.of(changeStreamConfig.getProjectId(), changeStreamConfig.getInstanceId(), changeStreamConfig.getDatabaseId);
   Dialect changeStreamDatabaseDialect = getDialect(changeStreamSpannerConfig) ? 
   LOG.info("The Spanner database " + changeStreamDatabaseId + " has dialect " + changeStreamDatabaseDialect);
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1646,7 +1654,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
               partitionMetadataSpannerConfig,
               partitionMetadataTableName,
               rpcPriority,
-              input.getPipeline().getOptions().getJobName());
+              input.getPipeline().getOptions().getJobName(),
+              isSpannerChangeStreamDatabasePostgres,

Review Comment:
   I think you should it a [Dialect](https://github.com/googleapis/java-spanner/blob/main/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Dialect.java), which makes the class more extensible.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -83,30 +89,35 @@ public class ChangeStreamRecordMapper {
   private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
   private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
   private static final String TOKEN_COLUMN = "token";
+  private final boolean isPostgres;
 
-  ChangeStreamRecordMapper() {}
+  ChangeStreamRecordMapper(boolean isPostgres) {

Review Comment:
   Could we use a `Dialect` instead of a `boolean`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -197,13 +209,21 @@ public class ChangeStreamRecordMapper {
    * @return a {@link List} of {@link ChangeStreamRecord} subclasses
    */
   public List<ChangeStreamRecord> toChangeStreamRecords(
-      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
-    return row.getStructList(0).stream()
+      PartitionMetadata partition,
+      ChangeStreamResultSet row,

Review Comment:
   Since we are calling this method with a `ChangeStreamResultSet` now, this is no longer a row, but a `resultSet`. We make it a row by either calling `resultSet.getPgJsonb(0)` or `resultSet.getCurrentRowAsStruct()`. Could you change the variable name here?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java:
##########
@@ -25,23 +25,32 @@
  */
 public class NameGenerator {
 
-  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
-      "CDC_Partitions_Metadata_%s_%s";
+  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s";
+  private static final int MAX_POSTGRES_TABLE_NAME_LENGTH = 63;
+  private static final int MAX_TABLE_NAME_LENGTH = 128;
 
   /**
    * Generates an unique name for the partition metadata table in the form of {@code
-   * "CDC_Partitions_Metadata_<databaseId>_<uuid>"}.
+   * "Metadata_<databaseId>_<uuid>"}.
    *
    * @param databaseId The database id where the table will be created
    * @return the unique generated name of the partition metadata table
    */
-  public static String generatePartitionMetadataTableName(String databaseId) {
+  public static String generatePartitionMetadataTableName(String databaseId, boolean isPostgres) {
     // Maximum Spanner table name length is 128 characters.
-    // There are 25 characters in the name format.
+    // There are 11 characters in the name format.
     // Maximum Spanner database ID length is 30 characters.
     // UUID always generates a String with 36 characters.
-    // 128 - (25 + 30 + 36) = 37 characters short of the limit
-    return String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, UUID.randomUUID())
-        .replaceAll("-", "_");
+    // For GoogleSQL, 128 - (11 + 30 + 36) = 51 characters short of the limit
+    // For Postgres, since the limit is 64, we may need to truncate the table name depending
+    // on the database length.
+    int maxTableNameLength = isPostgres ? MAX_POSTGRES_TABLE_NAME_LENGTH : MAX_TABLE_NAME_LENGTH;

Review Comment:
   Since GoogleSQL char limit is lesser than PG, could we always use that instead (to avoid passing in a dialect here?). If you feel strongly that we should use the max for PG, could you give this method a Dialect instead of a boolean?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -260,6 +312,65 @@ private DataChangeRecord toDataChangeRecord(
         changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
   }
 
+  private DataChangeRecord toDataChangeRecordJson(
+      PartitionMetadata partition, Value row, ChangeStreamResultSetMetadata resultSetMetadata) {
+    Value dataChangeRecordValue =
+        Optional.ofNullable(row.getStructValue().getFieldsMap().get(DATA_CHANGE_RECORD_COLUMN))
+            .orElseThrow(IllegalArgumentException::new);
+    Map<String, Value> valueMap = dataChangeRecordValue.getStructValue().getFieldsMap();
+    final String commitTimestamp =
+        Optional.ofNullable(valueMap.get(COMMIT_TIMESTAMP_COLUMN))
+            .orElseThrow(IllegalArgumentException::new)
+            .getStringValue();
+    return new DataChangeRecord(
+        partition.getPartitionToken(),
+        Timestamp.parseTimestamp(commitTimestamp),
+        Optional.ofNullable(valueMap.get(SERVER_TRANSACTION_ID_COLUMN))
+            .orElseThrow(IllegalArgumentException::new)

Review Comment:
   All the fields that we are doing `orElseThrow` are mandatory right (just double checking)?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java:
##########
@@ -63,7 +65,9 @@ public DaoFactory(
       SpannerConfig metadataSpannerConfig,
       String partitionMetadataTableName,
       RpcPriority rpcPriority,
-      String jobName) {
+      String jobName,
+      boolean isSpannerConfigPostgres,

Review Comment:
   Could we use a `Dialect` instead of booleans?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org