You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/07 22:17:54 UTC
[beam] branch master updated: [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table. (#25311)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bf5114bb2fc [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table. (#25311)
bf5114bb2fc is described below
commit bf5114bb2fc128d647b9b722dd902dedc160f7ed
Author: nancyxu123 <na...@gmail.com>
AuthorDate: Tue Feb 7 14:17:46 2023 -0800
[BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table. (#25311)
* Made sure state transitions were correct when updating partition state
* Metadata Table changes
* Comitted
* Fixed failing unittests
* Spotless apply
---------
Co-authored-by: Nancy Xu <na...@google.com>
---
.../action/DetectNewPartitionsAction.java | 2 +-
.../action/QueryChangeStreamAction.java | 2 +-
.../changestreams/dao/PartitionMetadataDao.java | 85 +++++++++++++++-
.../dao/PartitionMetadataDaoTest.java | 108 ++++++++++++++++++---
4 files changed, 177 insertions(+), 20 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
index 348e23e366e..934210250f5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
@@ -179,7 +179,7 @@ public class DetectNewPartitionsAction {
partition.toBuilder().setScheduledAt(scheduledAt).build();
LOG.info(
- "[{}] Scheduled partition at {} with start time {} and end time {}",
+ "[{}] Outputting partition at {} with start time {} and end time {}",
updatedPartition.getPartitionToken(),
updatedPartition.getScheduledAt(),
updatedPartition.getStartTimestamp(),
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 5fd39a6b13f..6fb60f21882 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -233,7 +233,7 @@ public class QueryChangeStreamAction {
LOG.debug("[{}] Finishing partition", token);
partitionMetadataDao.updateToFinished(token);
metrics.decActivePartitionReadCounter();
- LOG.info("[{}] Partition finished", token);
+ LOG.info("[{}] After attempting to finish the partition", token);
}
return ProcessContinuation.stop();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index f6f32c8023f..6dc0e7a580d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -39,7 +39,10 @@ import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.Value;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -48,6 +51,8 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Data access object for the Connector metadata tables. */
public class PartitionMetadataDao {
@@ -349,6 +354,7 @@ public class PartitionMetadataDao {
/** Represents the execution of a read / write transaction in Cloud Spanner. */
public static class InTransactionContext {
+ private static final Logger LOG = LoggerFactory.getLogger(InTransactionContext.class);
private final String metadataTableName;
private final TransactionContext transaction;
@@ -390,11 +396,25 @@ public class PartitionMetadataDao {
* @param partitionTokens the partitions' unique identifiers
*/
public Void updateToScheduled(List<String> partitionTokens) {
- final List<Mutation> mutations =
- partitionTokens.stream()
- .map(token -> createUpdateMetadataStateMutationFrom(token, State.SCHEDULED))
- .collect(Collectors.toList());
- transaction.buffer(mutations);
+ HashSet<String> tokens = new HashSet<>();
+ Statement statement = getPartitionsMatchingState(partitionTokens, State.CREATED);
+ try (ResultSet resultSet = transaction.executeQuery(statement)) {
+ while (resultSet.next()) {
+ tokens.add(resultSet.getString(COLUMN_PARTITION_TOKEN));
+ }
+ }
+
+ for (String partitionToken : partitionTokens) {
+ if (!tokens.contains(partitionToken)) {
+ LOG.info("[{}] Did not update to be SCHEDULED", partitionToken);
+ continue;
+ }
+
+ LOG.info("[{}] Successfully updating to be SCHEDULED", partitionToken);
+ transaction.buffer(
+ ImmutableList.of(
+ createUpdateMetadataStateMutationFrom(partitionToken, State.SCHEDULED)));
+ }
return null;
}
@@ -404,6 +424,16 @@ public class PartitionMetadataDao {
* @param partitionToken the partition unique identifier
*/
public Void updateToRunning(String partitionToken) {
+ Statement statement =
+ getPartitionsMatchingState(Collections.singletonList(partitionToken), State.SCHEDULED);
+
+ try (ResultSet resultSet = transaction.executeQuery(statement)) {
+ if (!resultSet.next()) {
+ LOG.info("[{}] Did not update to be RUNNING", partitionToken);
+ return null;
+ }
+ }
+ LOG.info("[{}] Successfully updating to be RUNNING", partitionToken);
transaction.buffer(
ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.RUNNING)));
return null;
@@ -415,6 +445,15 @@ public class PartitionMetadataDao {
* @param partitionToken the partition unique identifier
*/
public Void updateToFinished(String partitionToken) {
+ Statement statement =
+ getPartitionsMatchingState(Collections.singletonList(partitionToken), State.RUNNING);
+ try (ResultSet resultSet = transaction.executeQuery(statement)) {
+ if (!resultSet.next()) {
+ LOG.info("[{}] Did not update to be FINISHED", partitionToken);
+ return null;
+ }
+ }
+ LOG.info("[{}] Successfully updating to be FINISHED", partitionToken);
transaction.buffer(
ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.FINISHED)));
return null;
@@ -494,6 +533,42 @@ public class PartitionMetadataDao {
.build();
}
+ private Statement getPartitionsMatchingState(List<String> partitionTokens, State state) {
+ Statement statement;
+ if (this.dialect == Dialect.POSTGRESQL) {
+ StringBuilder sqlStringBuilder =
+ new StringBuilder("SELECT * FROM \"" + metadataTableName + "\"");
+ sqlStringBuilder.append(" WHERE \"");
+ sqlStringBuilder.append(COLUMN_STATE + "\" = " + "'" + state.toString() + "'");
+ if (!partitionTokens.isEmpty()) {
+ sqlStringBuilder.append(" AND \"");
+ sqlStringBuilder.append(COLUMN_PARTITION_TOKEN);
+ sqlStringBuilder.append("\"");
+ sqlStringBuilder.append(" = ANY (Array[");
+ sqlStringBuilder.append(
+ partitionTokens.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")));
+ sqlStringBuilder.append("])");
+ }
+ statement = Statement.newBuilder(sqlStringBuilder.toString()).build();
+ } else {
+ statement =
+ Statement.newBuilder(
+ "SELECT * FROM "
+ + metadataTableName
+ + " WHERE "
+ + COLUMN_PARTITION_TOKEN
+ + " IN UNNEST(@partitionTokens) AND "
+ + COLUMN_STATE
+ + " = @state")
+ .bind("partitionTokens")
+ .to(Value.stringArray(new ArrayList<>(partitionTokens)))
+ .bind("state")
+ .to(state.toString())
+ .build();
+ }
+ return statement;
+ }
+
private Mutation createUpdateMetadataStateMutationFrom(String partitionToken, State state) {
final String timestampColumn = stateToTimestampColumn.get(state);
if (timestampColumn == null) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
index dd3621d1c80..c286ab84aeb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
@@ -36,8 +36,10 @@ import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.Value;
+import java.util.Collections;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.junit.Before;
@@ -105,19 +107,6 @@ public class PartitionMetadataDaoTest {
assertEquals(Timestamp.ofTimeMicroseconds(1L), commitTimestamp);
}
- @Test
- public void testUpdateToFinished() {
- when(databaseClient.readWriteTransaction()).thenReturn(readWriteTransactionRunner);
- when(readWriteTransactionRunner.run(any())).thenReturn(null);
- when(readWriteTransactionRunner.getCommitTimestamp())
- .thenReturn(Timestamp.ofTimeMicroseconds(1L));
- Timestamp commitTimestamp = partitionMetadataDao.updateToFinished(PARTITION_TOKEN);
- verify(databaseClient, times(1)).readWriteTransaction();
- verify(readWriteTransactionRunner, times(1)).run(any());
- verify(readWriteTransactionRunner, times(1)).getCommitTimestamp();
- assertEquals(Timestamp.ofTimeMicroseconds(1L), commitTimestamp);
- }
-
@Test
public void testInTransactionContextInsert() {
ArgumentCaptor<ImmutableList<Mutation>> mutations =
@@ -149,8 +138,101 @@ public class PartitionMetadataDaoTest {
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
}
+ @Test
+ public void testInTransactionContextCannotUpdateToRunning() {
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ // assertEquals(0, mutations.getValue().size());
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
+ @Test
+ public void testInTransactionContextUpdateToRunning() {
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+ when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ assertEquals(1, mutations.getValue().size());
+ Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+ assertEquals(
+ PARTITION_TOKEN,
+ mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+ assertEquals(
+ PartitionMetadata.State.RUNNING.toString(),
+ mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+ }
+
+ @Test
+ public void testInTransactionContextCannotUpdateToScheduled() {
+ System.out.println("Cannot update to scheduled");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
+ @Test
+ public void testInTransactionContextUpdateToScheduled() {
+ System.out.println(" update to scheduled");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true).thenReturn(false);
+ when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
+ when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+ assertEquals(1, mutations.getValue().size());
+ Map<String, Value> mutationValueMap = mutations.getValue().iterator().next().asMap();
+ assertEquals(
+ PARTITION_TOKEN,
+ mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+ assertEquals(
+ PartitionMetadata.State.SCHEDULED.toString(),
+ mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+ }
+
+ @Test
+ public void testInTransactionContextCannotUpdateToFinished() {
+ System.out.println("Cannot update to finished");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ assertNull(inTransactionContext.updateToFinished(PARTITION_TOKEN));
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
@Test
public void testInTransactionContextUpdateToFinished() {
+ System.out.println("update to scheduled");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true).thenReturn(false);
+ when(resultSet.getString(any())).thenReturn(State.RUNNING.toString());
+ when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
ArgumentCaptor<ImmutableList<Mutation>> mutations =
ArgumentCaptor.forClass(ImmutableList.class);
doNothing().when(transaction).buffer(mutations.capture());