You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "thiagotnunes (via GitHub)" <gi...@apache.org> on 2023/02/07 22:16:50 UTC

[GitHub] [beam] thiagotnunes commented on a diff in pull request #25311: [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table.

thiagotnunes commented on code in PR #25311:
URL: https://github.com/apache/beam/pull/25311#discussion_r1099323297


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -494,6 +533,42 @@ private Mutation createInsertMetadataMutationFrom(PartitionMetadata partitionMet
           .build();
     }
 
+    private Statement getPartitionsMatchingState(List<String> partitionTokens, State state) {
+      Statement statement;
+      if (this.dialect == Dialect.POSTGRESQL) {
+        StringBuilder sqlStringBuilder =
+            new StringBuilder("SELECT * FROM \"" + metadataTableName + "\"");

Review Comment:
   nit: instead of using `+`, we should use `append` for building all parts of the `StringBuilder`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -233,7 +233,7 @@ public ProcessContinuation run(
       LOG.debug("[{}] Finishing partition", token);
       partitionMetadataDao.updateToFinished(token);
       metrics.decActivePartitionReadCounter();
-      LOG.info("[{}] Partition finished", token);
+      LOG.info("[{}] After attempting to finish the partition", token);

Review Comment:
   Is there a scenario where we couldn't finish it?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());

Review Comment:
   Remove?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToRunning() {
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.RUNNING.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToScheduled() {
+    System.out.println("Cannot update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToScheduled() {
+    System.out.println(" update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.SCHEDULED.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToFinished() {
+    System.out.println("Cannot update to finished");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    assertNull(inTransactionContext.updateToFinished(PARTITION_TOKEN));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
   @Test
   public void testInTransactionContextUpdateToFinished() {
+    System.out.println("update to scheduled");

Review Comment:
   Please remove the `println`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -494,6 +533,42 @@ private Mutation createInsertMetadataMutationFrom(PartitionMetadata partitionMet
           .build();
     }
 
+    private Statement getPartitionsMatchingState(List<String> partitionTokens, State state) {
+      Statement statement;
+      if (this.dialect == Dialect.POSTGRESQL) {
+        StringBuilder sqlStringBuilder =
+            new StringBuilder("SELECT * FROM \"" + metadataTableName + "\"");
+        sqlStringBuilder.append(" WHERE \"");
+        sqlStringBuilder.append(COLUMN_STATE + "\" = " + "'" + state.toString() + "'");
+        if (!partitionTokens.isEmpty()) {
+          sqlStringBuilder.append(" AND \"");
+          sqlStringBuilder.append(COLUMN_PARTITION_TOKEN);
+          sqlStringBuilder.append("\"");
+          sqlStringBuilder.append(" = ANY (Array[");
+          sqlStringBuilder.append(
+              partitionTokens.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")));

Review Comment:
   Just double checking that we can't do this with a parameterized queries in PG. It would save us a little time in the backend.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToRunning() {
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.RUNNING.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToScheduled() {
+    System.out.println("Cannot update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToScheduled() {
+    System.out.println(" update to scheduled");

Review Comment:
   Please remove the `println`



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToRunning() {
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.RUNNING.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToScheduled() {
+    System.out.println("Cannot update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToScheduled() {
+    System.out.println(" update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.SCHEDULED.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToFinished() {
+    System.out.println("Cannot update to finished");

Review Comment:
   Please remove the `println`



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToRunning() {
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.RUNNING.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToScheduled() {
+    System.out.println("Cannot update to scheduled");

Review Comment:
   Please remove the `println`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org