You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/07 22:17:54 UTC

[beam] branch master updated: [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table. (#25311)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bf5114bb2fc [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table. (#25311)
bf5114bb2fc is described below

commit bf5114bb2fc128d647b9b722dd902dedc160f7ed
Author: nancyxu123 <na...@gmail.com>
AuthorDate: Tue Feb 7 14:17:46 2023 -0800

    [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table. (#25311)
    
    * Made sure state transitions were correct when updating partition state
    
    * Metadata Table changes
    
    * Comitted
    
    * Fixed failing unittests
    
    * Spotless apply
    
    ---------
    
    Co-authored-by: Nancy Xu <na...@google.com>
---
 .../action/DetectNewPartitionsAction.java          |   2 +-
 .../action/QueryChangeStreamAction.java            |   2 +-
 .../changestreams/dao/PartitionMetadataDao.java    |  85 +++++++++++++++-
 .../dao/PartitionMetadataDaoTest.java              | 108 ++++++++++++++++++---
 4 files changed, 177 insertions(+), 20 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
index 348e23e366e..934210250f5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
@@ -179,7 +179,7 @@ public class DetectNewPartitionsAction {
           partition.toBuilder().setScheduledAt(scheduledAt).build();
 
       LOG.info(
-          "[{}] Scheduled partition at {} with start time {} and end time {}",
+          "[{}] Outputting partition at {} with start time {} and end time {}",
           updatedPartition.getPartitionToken(),
           updatedPartition.getScheduledAt(),
           updatedPartition.getStartTimestamp(),
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 5fd39a6b13f..6fb60f21882 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -233,7 +233,7 @@ public class QueryChangeStreamAction {
       LOG.debug("[{}] Finishing partition", token);
       partitionMetadataDao.updateToFinished(token);
       metrics.decActivePartitionReadCounter();
-      LOG.info("[{}] Partition finished", token);
+      LOG.info("[{}] After attempting to finish the partition", token);
     }
     return ProcessContinuation.stop();
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index f6f32c8023f..6dc0e7a580d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -39,7 +39,10 @@ import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TransactionContext;
 import com.google.cloud.spanner.TransactionRunner;
 import com.google.cloud.spanner.Value;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -48,6 +51,8 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Data access object for the Connector metadata tables. */
 public class PartitionMetadataDao {
@@ -349,6 +354,7 @@ public class PartitionMetadataDao {
 
   /** Represents the execution of a read / write transaction in Cloud Spanner. */
   public static class InTransactionContext {
+    private static final Logger LOG = LoggerFactory.getLogger(InTransactionContext.class);
 
     private final String metadataTableName;
     private final TransactionContext transaction;
@@ -390,11 +396,25 @@ public class PartitionMetadataDao {
      * @param partitionTokens the partitions' unique identifiers
      */
     public Void updateToScheduled(List<String> partitionTokens) {
-      final List<Mutation> mutations =
-          partitionTokens.stream()
-              .map(token -> createUpdateMetadataStateMutationFrom(token, State.SCHEDULED))
-              .collect(Collectors.toList());
-      transaction.buffer(mutations);
+      HashSet<String> tokens = new HashSet<>();
+      Statement statement = getPartitionsMatchingState(partitionTokens, State.CREATED);
+      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+        while (resultSet.next()) {
+          tokens.add(resultSet.getString(COLUMN_PARTITION_TOKEN));
+        }
+      }
+
+      for (String partitionToken : partitionTokens) {
+        if (!tokens.contains(partitionToken)) {
+          LOG.info("[{}] Did not update to be SCHEDULED", partitionToken);
+          continue;
+        }
+
+        LOG.info("[{}] Successfully updating to be SCHEDULED", partitionToken);
+        transaction.buffer(
+            ImmutableList.of(
+                createUpdateMetadataStateMutationFrom(partitionToken, State.SCHEDULED)));
+      }
       return null;
     }
 
@@ -404,6 +424,16 @@ public class PartitionMetadataDao {
      * @param partitionToken the partition unique identifier
      */
     public Void updateToRunning(String partitionToken) {
+      Statement statement =
+          getPartitionsMatchingState(Collections.singletonList(partitionToken), State.SCHEDULED);
+
+      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+        if (!resultSet.next()) {
+          LOG.info("[{}] Did not update to be RUNNING", partitionToken);
+          return null;
+        }
+      }
+      LOG.info("[{}] Successfully updating to be RUNNING", partitionToken);
       transaction.buffer(
           ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.RUNNING)));
       return null;
@@ -415,6 +445,15 @@ public class PartitionMetadataDao {
      * @param partitionToken the partition unique identifier
      */
     public Void updateToFinished(String partitionToken) {
+      Statement statement =
+          getPartitionsMatchingState(Collections.singletonList(partitionToken), State.RUNNING);
+      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+        if (!resultSet.next()) {
+          LOG.info("[{}] Did not update to be FINISHED", partitionToken);
+          return null;
+        }
+      }
+      LOG.info("[{}] Successfully updating to be FINISHED", partitionToken);
       transaction.buffer(
           ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.FINISHED)));
       return null;
@@ -494,6 +533,42 @@ public class PartitionMetadataDao {
           .build();
     }
 
+    private Statement getPartitionsMatchingState(List<String> partitionTokens, State state) {
+      Statement statement;
+      if (this.dialect == Dialect.POSTGRESQL) {
+        StringBuilder sqlStringBuilder =
+            new StringBuilder("SELECT * FROM \"" + metadataTableName + "\"");
+        sqlStringBuilder.append(" WHERE \"");
+        sqlStringBuilder.append(COLUMN_STATE + "\" = " + "'" + state.toString() + "'");
+        if (!partitionTokens.isEmpty()) {
+          sqlStringBuilder.append(" AND \"");
+          sqlStringBuilder.append(COLUMN_PARTITION_TOKEN);
+          sqlStringBuilder.append("\"");
+          sqlStringBuilder.append(" = ANY (Array[");
+          sqlStringBuilder.append(
+              partitionTokens.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")));
+          sqlStringBuilder.append("])");
+        }
+        statement = Statement.newBuilder(sqlStringBuilder.toString()).build();
+      } else {
+        statement =
+            Statement.newBuilder(
+                    "SELECT * FROM "
+                        + metadataTableName
+                        + " WHERE "
+                        + COLUMN_PARTITION_TOKEN
+                        + " IN UNNEST(@partitionTokens) AND "
+                        + COLUMN_STATE
+                        + " = @state")
+                .bind("partitionTokens")
+                .to(Value.stringArray(new ArrayList<>(partitionTokens)))
+                .bind("state")
+                .to(state.toString())
+                .build();
+      }
+      return statement;
+    }
+
     private Mutation createUpdateMetadataStateMutationFrom(String partitionToken, State state) {
       final String timestampColumn = stateToTimestampColumn.get(state);
       if (timestampColumn == null) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
index dd3621d1c80..c286ab84aeb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
@@ -36,8 +36,10 @@ import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TransactionContext;
 import com.google.cloud.spanner.TransactionRunner;
 import com.google.cloud.spanner.Value;
+import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.junit.Before;
@@ -105,19 +107,6 @@ public class PartitionMetadataDaoTest {
     assertEquals(Timestamp.ofTimeMicroseconds(1L), commitTimestamp);
   }
 
-  @Test
-  public void testUpdateToFinished() {
-    when(databaseClient.readWriteTransaction()).thenReturn(readWriteTransactionRunner);
-    when(readWriteTransactionRunner.run(any())).thenReturn(null);
-    when(readWriteTransactionRunner.getCommitTimestamp())
-        .thenReturn(Timestamp.ofTimeMicroseconds(1L));
-    Timestamp commitTimestamp = partitionMetadataDao.updateToFinished(PARTITION_TOKEN);
-    verify(databaseClient, times(1)).readWriteTransaction();
-    verify(readWriteTransactionRunner, times(1)).run(any());
-    verify(readWriteTransactionRunner, times(1)).getCommitTimestamp();
-    assertEquals(Timestamp.ofTimeMicroseconds(1L), commitTimestamp);
-  }
-
   @Test
   public void testInTransactionContextInsert() {
     ArgumentCaptor<ImmutableList<Mutation>> mutations =
@@ -149,8 +138,101 @@ public class PartitionMetadataDaoTest {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToRunning() {
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.RUNNING.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToScheduled() {
+    System.out.println("Cannot update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToScheduled() {
+    System.out.println(" update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.SCHEDULED.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToFinished() {
+    System.out.println("Cannot update to finished");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    assertNull(inTransactionContext.updateToFinished(PARTITION_TOKEN));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
   @Test
   public void testInTransactionContextUpdateToFinished() {
+    System.out.println("update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(resultSet.getString(any())).thenReturn(State.RUNNING.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
     ArgumentCaptor<ImmutableList<Mutation>> mutations =
         ArgumentCaptor.forClass(ImmutableList.class);
     doNothing().when(transaction).buffer(mutations.capture());