You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "nancyxu123 (via GitHub)" <gi...@apache.org> on 2023/02/04 01:11:13 UTC

[GitHub] [beam] nancyxu123 opened a new pull request, #25311: Clean metadata changes

nancyxu123 opened a new pull request, #25311:
URL: https://github.com/apache/beam/pull/25311

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25311: Clean metadata changes

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25311:
URL: https://github.com/apache/beam/pull/25311#issuecomment-1416624303

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #25311: [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table.

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on PR #25311:
URL: https://github.com/apache/beam/pull/25311#issuecomment-1421538057

   thanks! LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] thiagotnunes commented on a diff in pull request #25311: [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table.

Posted by "thiagotnunes (via GitHub)" <gi...@apache.org>.
thiagotnunes commented on code in PR #25311:
URL: https://github.com/apache/beam/pull/25311#discussion_r1099323297


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -494,6 +533,42 @@ private Mutation createInsertMetadataMutationFrom(PartitionMetadata partitionMet
           .build();
     }
 
+    private Statement getPartitionsMatchingState(List<String> partitionTokens, State state) {
+      Statement statement;
+      if (this.dialect == Dialect.POSTGRESQL) {
+        StringBuilder sqlStringBuilder =
+            new StringBuilder("SELECT * FROM \"" + metadataTableName + "\"");

Review Comment:
   nit: instead of using `+`, we should use `append` for building all parts of the `StringBuilder`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -233,7 +233,7 @@ public ProcessContinuation run(
       LOG.debug("[{}] Finishing partition", token);
       partitionMetadataDao.updateToFinished(token);
       metrics.decActivePartitionReadCounter();
-      LOG.info("[{}] Partition finished", token);
+      LOG.info("[{}] After attempting to finish the partition", token);

Review Comment:
   Is there a scenario where we couldn't finish it?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());

Review Comment:
   Remove?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToRunning() {
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.RUNNING.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToScheduled() {
+    System.out.println("Cannot update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToScheduled() {
+    System.out.println(" update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.SCHEDULED.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToFinished() {
+    System.out.println("Cannot update to finished");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    assertNull(inTransactionContext.updateToFinished(PARTITION_TOKEN));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
   @Test
   public void testInTransactionContextUpdateToFinished() {
+    System.out.println("update to scheduled");

Review Comment:
   Please remove the `println`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -494,6 +533,42 @@ private Mutation createInsertMetadataMutationFrom(PartitionMetadata partitionMet
           .build();
     }
 
+    private Statement getPartitionsMatchingState(List<String> partitionTokens, State state) {
+      Statement statement;
+      if (this.dialect == Dialect.POSTGRESQL) {
+        StringBuilder sqlStringBuilder =
+            new StringBuilder("SELECT * FROM \"" + metadataTableName + "\"");
+        sqlStringBuilder.append(" WHERE \"");
+        sqlStringBuilder.append(COLUMN_STATE + "\" = " + "'" + state.toString() + "'");
+        if (!partitionTokens.isEmpty()) {
+          sqlStringBuilder.append(" AND \"");
+          sqlStringBuilder.append(COLUMN_PARTITION_TOKEN);
+          sqlStringBuilder.append("\"");
+          sqlStringBuilder.append(" = ANY (Array[");
+          sqlStringBuilder.append(
+              partitionTokens.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")));

Review Comment:
   Just double checking that we can't do this with a parameterized queries in PG. It would save us a little time in the backend.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToRunning() {
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.RUNNING.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToScheduled() {
+    System.out.println("Cannot update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToScheduled() {
+    System.out.println(" update to scheduled");

Review Comment:
   Please remove the `println`



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToRunning() {
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.RUNNING.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToScheduled() {
+    System.out.println("Cannot update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToScheduled() {
+    System.out.println(" update to scheduled");
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.SCHEDULED.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToFinished() {
+    System.out.println("Cannot update to finished");

Review Comment:
   Please remove the `println`



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
         mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
   }
 
+  @Test
+  public void testInTransactionContextCannotUpdateToRunning() {
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    // assertEquals(0, mutations.getValue().size());
+    verify(transaction, times(0)).buffer(mutations.capture());
+  }
+
+  @Test
+  public void testInTransactionContextUpdateToRunning() {
+    ResultSet resultSet = mock(ResultSet.class);
+    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+    ArgumentCaptor<ImmutableList<Mutation>> mutations =
+        ArgumentCaptor.forClass(ImmutableList.class);
+    doNothing().when(transaction).buffer(mutations.capture());
+    assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+    assertEquals(1, mutations.getValue().size());
+    Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+    assertEquals(
+        PARTITION_TOKEN,
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+    assertEquals(
+        PartitionMetadata.State.RUNNING.toString(),
+        mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+  }
+
+  @Test
+  public void testInTransactionContextCannotUpdateToScheduled() {
+    System.out.println("Cannot update to scheduled");

Review Comment:
   Please remove the `println`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem merged pull request #25311: [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table.

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem merged PR #25311:
URL: https://github.com/apache/beam/pull/25311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org