You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "thiagotnunes (via GitHub)" <gi...@apache.org> on 2023/02/03 20:45:31 UTC

[GitHub] [beam] thiagotnunes commented on a diff in pull request #25309: [BEAM-12164] Made sure state transitions were correct when updating partition state

thiagotnunes commented on code in PR #25309:
URL: https://github.com/apache/beam/pull/25309#discussion_r1096266289


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -390,11 +392,30 @@ public Void insert(PartitionMetadata row) {
      * @param partitionTokens the partitions' unique identifiers
      */
     public Void updateToScheduled(List<String> partitionTokens) {
-      final List<Mutation> mutations =
-          partitionTokens.stream()
-              .map(token -> createUpdateMetadataStateMutationFrom(token, State.SCHEDULED))
-              .collect(Collectors.toList());
-      transaction.buffer(mutations);
+      HashMap<String, State> tokenToState = new HashMap<>();
+      Statement statement = createPartitionQueryStatement(partitionTokens);
+      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+        while (resultSet.next()) {
+          if (resultSet.getString(COLUMN_PARTITION_TOKEN) == null

Review Comment:
   Why would the partition token / state be `null`? I think they are actually `NOT NULL` columns



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -390,11 +392,30 @@ public Void insert(PartitionMetadata row) {
      * @param partitionTokens the partitions' unique identifiers
      */
     public Void updateToScheduled(List<String> partitionTokens) {
-      final List<Mutation> mutations =
-          partitionTokens.stream()
-              .map(token -> createUpdateMetadataStateMutationFrom(token, State.SCHEDULED))
-              .collect(Collectors.toList());
-      transaction.buffer(mutations);
+      HashMap<String, State> tokenToState = new HashMap<>();
+      Statement statement = createPartitionQueryStatement(partitionTokens);
+      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+        while (resultSet.next()) {
+          if (resultSet.getString(COLUMN_PARTITION_TOKEN) == null
+              || resultSet.getString(COLUMN_STATE) == null) {
+            continue;
+          }
+          tokenToState.put(
+              resultSet.getString(COLUMN_PARTITION_TOKEN),
+              State.valueOf(resultSet.getString(COLUMN_STATE)));
+        }
+      }
+
+      for (String partitionToken : partitionTokens) {

Review Comment:
   Instead of programatically doing this, could we do a `SELECT * FROM metadata_table WHERE state = CREATED AND partition_token IN (@partitionTokens)`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -404,6 +425,18 @@ public Void updateToScheduled(List<String> partitionTokens) {
      * @param partitionToken the partition unique identifier
      */
     public Void updateToRunning(String partitionToken) {
+      Statement statement =
+          createPartitionQueryStatement(Collections.singletonList(partitionToken));
+      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+        if (resultSet.next()) {
+          if (resultSet.getString(COLUMN_STATE) == null

Review Comment:
   I don't think the state can be `NULL`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org