You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/02/07 23:04:08 UTC

[beam] branch master updated: Added Role-based access control integration tests for Spanner Change Streams (#25246)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a9e80d2981c Added Role-based access control integration tests for Spanner Change Streams (#25246)
a9e80d2981c is described below

commit a9e80d2981c8881581540d4b2bc06afb26ccd1a2
Author: Doug Judd <nu...@google.com>
AuthorDate: Tue Feb 7 15:04:01 2023 -0800

    Added Role-based access control integration tests for Spanner Change Streams (#25246)
---
 .../it/ChangeStreamTestPipelineOptions.java        |  8 ++-
 .../changestreams/it/IntegrationTestEnv.java       | 62 ++++++++++++++++++----
 .../changestreams/it/SpannerChangeStreamIT.java    | 40 ++++++++++++--
 3 files changed, 94 insertions(+), 16 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/ChangeStreamTestPipelineOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/ChangeStreamTestPipelineOptions.java
index de18eeed74a..83fc9ebc791 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/ChangeStreamTestPipelineOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/ChangeStreamTestPipelineOptions.java
@@ -37,8 +37,14 @@ public interface ChangeStreamTestPipelineOptions extends IOTestPipelineOptions,
   void setInstanceId(String value);
 
   @Description("Database ID prefix to write to in Spanner")
-  @Default.String("changestream")
+  @Default.String("cstest_primary")
   String getDatabaseId();
 
   void setDatabaseId(String value);
+
+  @Description("Metadata database ID prefix to write to in Spanner")
+  @Default.String("cstest_metadata")
+  String getMetadataDatabaseId();
+
+  void setMetadataDatabaseId(String value);
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java
index 75e06c64e6e..7b5aa5c97c1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java
@@ -24,6 +24,7 @@ import com.google.cloud.spanner.Dialect;
 import com.google.cloud.spanner.Spanner;
 import com.google.cloud.spanner.SpannerOptions;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
@@ -48,18 +49,21 @@ public class IntegrationTestEnv extends ExternalResource {
   private static final String METADATA_TABLE_NAME_PREFIX = "TestMetadata";
   private static final String SINGERS_TABLE_NAME_PREFIX = "Singers";
   private static final String CHANGE_STREAM_NAME_PREFIX = "SingersStream";
+  private static final String DATABASE_ROLE = "test_role";
   private List<String> changeStreams;
   private List<String> tables;
 
   private String projectId;
   private String instanceId;
   private String databaseId;
+  private String metadataDatabaseId;
   private String metadataTableName;
   private Spanner spanner;
   private final String host = "https://spanner.googleapis.com";
   private DatabaseAdminClient databaseAdminClient;
   private DatabaseClient databaseClient;
   private boolean isPostgres;
+  public boolean useSeparateMetadataDb;
 
   @Override
   protected void before() throws Throwable {
@@ -70,14 +74,13 @@ public class IntegrationTestEnv extends ExternalResource {
         Optional.ofNullable(options.getProjectId())
             .orElseGet(() -> options.as(GcpOptions.class).getProject());
     instanceId = options.getInstanceId();
-    databaseId = generateDatabaseName(options.getDatabaseId());
+    generateDatabaseIds(options);
     spanner =
         SpannerOptions.newBuilder().setProjectId(projectId).setHost(host).build().getService();
     databaseAdminClient = spanner.getDatabaseAdminClient();
     metadataTableName = generateTableName(METADATA_TABLE_NAME_PREFIX);
 
     recreateDatabase(databaseAdminClient, instanceId, databaseId, isPostgres);
-
     databaseClient = spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
 
     changeStreams = new ArrayList<>();
@@ -144,10 +147,17 @@ public class IntegrationTestEnv extends ExternalResource {
     } catch (Exception e) {
       LOG.error("Failed to drop database " + databaseId + ". Skipping...", e);
     }
-
+    if (useSeparateMetadataDb) {
+      databaseAdminClient.dropDatabase(instanceId, metadataDatabaseId);
+    }
     spanner.close();
   }
 
+  void createMetadataDatabase() throws ExecutionException, InterruptedException, TimeoutException {
+    recreateDatabase(databaseAdminClient, instanceId, metadataDatabaseId, isPostgres);
+    useSeparateMetadataDb = true;
+  }
+
   String createSingersTable() throws InterruptedException, ExecutionException, TimeoutException {
     final String tableName = generateTableName(SINGERS_TABLE_NAME_PREFIX);
     LOG.info("Creating table " + tableName);
@@ -168,7 +178,6 @@ public class IntegrationTestEnv extends ExternalResource {
                       + ")"),
               null)
           .get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
-      tables.add(tableName);
     } else {
       databaseAdminClient
           .updateDatabaseDdl(
@@ -185,8 +194,8 @@ public class IntegrationTestEnv extends ExternalResource {
                       + " ) PRIMARY KEY (SingerId)"),
               null)
           .get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
-      tables.add(tableName);
     }
+    tables.add(tableName);
     return tableName;
   }
 
@@ -214,10 +223,32 @@ public class IntegrationTestEnv extends ExternalResource {
           .get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
     }
     changeStreams.add(changeStreamName);
-
     return changeStreamName;
   }
 
+  void createRoleAndGrantPrivileges(String table, String changeStream)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    if (this.isPostgres) {
+      LOG.error("Database roles not supported with Postgres dialect.");
+      return;
+    }
+    databaseAdminClient
+        .updateDatabaseDdl(
+            instanceId,
+            databaseId,
+            Arrays.asList(
+                "CREATE ROLE " + DATABASE_ROLE,
+                "GRANT INSERT, UPDATE, DELETE ON TABLE " + table + " TO ROLE " + DATABASE_ROLE,
+                "GRANT SELECT ON CHANGE STREAM " + changeStream + " TO ROLE " + DATABASE_ROLE,
+                "GRANT EXECUTE ON TABLE FUNCTION READ_"
+                    + changeStream
+                    + " TO ROLE "
+                    + DATABASE_ROLE),
+            null)
+        .get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
+    return;
+  }
+
   String getProjectId() {
     return projectId;
   }
@@ -230,6 +261,14 @@ public class IntegrationTestEnv extends ExternalResource {
     return databaseId;
   }
 
+  String getMetadataDatabaseId() {
+    return metadataDatabaseId;
+  }
+
+  String getDatabaseRole() {
+    return DATABASE_ROLE;
+  }
+
   String getMetadataTableName() {
     return metadataTableName;
   }
@@ -282,10 +321,13 @@ public class IntegrationTestEnv extends ExternalResource {
             MAX_CHANGE_STREAM_NAME_LENGTH - 1 - CHANGE_STREAM_NAME_PREFIX.length());
   }
 
-  private String generateDatabaseName(String prefix) {
-    return prefix
-        + "_"
-        + RandomStringUtils.randomAlphanumeric(MAX_DATABASE_NAME_LENGTH - 1 - prefix.length())
+  private void generateDatabaseIds(ChangeStreamTestPipelineOptions options) {
+    int prefixLength =
+        Math.max(options.getDatabaseId().length(), options.getMetadataDatabaseId().length());
+    String suffix =
+        RandomStringUtils.randomAlphanumeric(MAX_DATABASE_NAME_LENGTH - 1 - prefixLength)
             .toLowerCase(Locale.ROOT);
+    databaseId = options.getDatabaseId() + "_" + suffix;
+    metadataDatabaseId = options.getMetadataDatabaseId() + "_" + suffix;
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
index de837a173bc..42e4eff41b5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.DatabaseClient;
@@ -38,10 +39,12 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
+import org.apache.beam.runners.direct.DirectRunner;
 import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
 import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -56,6 +59,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -64,8 +68,12 @@ import org.junit.runners.JUnit4;
 public class SpannerChangeStreamIT {
 
   @ClassRule public static final IntegrationTestEnv ENV = new IntegrationTestEnv();
+
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
 
+  /** Rule for exception testing. */
+  @Rule public ExpectedException exception = ExpectedException.none();
+
   private static String instanceId;
   private static String projectId;
   private static String databaseId;
@@ -83,6 +91,8 @@ public class SpannerChangeStreamIT {
     changeStreamTableName = ENV.createSingersTable();
     changeStreamName = ENV.createChangeStreamFor(changeStreamTableName);
     databaseClient = ENV.getDatabaseClient();
+    ENV.createMetadataDatabase();
+    ENV.createRoleAndGrantPrivileges(changeStreamTableName, changeStreamName);
   }
 
   @Before
@@ -93,6 +103,23 @@ public class SpannerChangeStreamIT {
 
   @Test
   public void testReadSpannerChangeStream() {
+    testReadSpannerChangeStreamImpl(pipeline, null);
+  }
+
+  @Test
+  public void testReadSpannerChangeStreamWithAuthorizedRole() {
+    testReadSpannerChangeStreamImpl(pipeline, ENV.getDatabaseRole());
+  }
+
+  @Test
+  public void testReadSpannerChangeStreamWithUnauthorizedRole() {
+    assumeTrue(pipeline.getOptions().getRunner() == DirectRunner.class);
+    exception.expect(SpannerException.class);
+    exception.expectMessage("Role not found: bad_role.");
+    testReadSpannerChangeStreamImpl(pipeline.enableAbandonedNodeEnforcement(false), "bad_role");
+  }
+
+  public void testReadSpannerChangeStreamImpl(TestPipeline testPipeline, String role) {
     // Defines how many rows are going to be inserted / updated / deleted in the test
     final int numRows = 5;
     // Inserts numRows rows and uses the first commit timestamp as the startAt for reading the
@@ -106,19 +133,22 @@ public class SpannerChangeStreamIT {
     final Pair<Timestamp, Timestamp> deleteTimestamps = deleteRows(numRows);
     final Timestamp endAt = deleteTimestamps.getRight();
 
-    final SpannerConfig spannerConfig =
+    SpannerConfig spannerConfig =
         SpannerConfig.create()
             .withProjectId(projectId)
             .withInstanceId(instanceId)
             .withDatabaseId(databaseId);
+    if (role != null) {
+      spannerConfig = spannerConfig.withDatabaseRole(StaticValueProvider.of(role));
+    }
 
     final PCollection<String> tokens =
-        pipeline
+        testPipeline
             .apply(
                 SpannerIO.readChangeStream()
                     .withSpannerConfig(spannerConfig)
                     .withChangeStreamName(changeStreamName)
-                    .withMetadataDatabase(databaseId)
+                    .withMetadataDatabase(ENV.getMetadataDatabaseId())
                     .withMetadataTable(metadataTableName)
                     .withInclusiveStartAt(startAt)
                     .withInclusiveEndAt(endAt))
@@ -143,7 +173,7 @@ public class SpannerChangeStreamIT {
             "DELETE,3,Updated First Name 3,Updated Last Name 3,null,null",
             "DELETE,4,Updated First Name 4,Updated Last Name 4,null,null",
             "DELETE,5,Updated First Name 5,Updated Last Name 5,null,null");
-    pipeline.run().waitUntilFinish();
+    testPipeline.run().waitUntilFinish();
 
     assertMetadataTableHasBeenDropped();
   }
@@ -176,7 +206,7 @@ public class SpannerChangeStreamIT {
                 SpannerIO.readChangeStream()
                     .withSpannerConfig(spannerConfig)
                     .withChangeStreamName(changeStreamName)
-                    .withMetadataDatabase(databaseId)
+                    .withMetadataDatabase(ENV.getMetadataDatabaseId())
                     .withMetadataTable(metadataTableName)
                     .withInclusiveStartAt(startAt)
                     .withInclusiveEndAt(endAt))