You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "pranavbhandari24 (via GitHub)" <gi...@apache.org> on 2023/04/26 20:22:25 UTC

[GitHub] [beam] pranavbhandari24 opened a new pull request, #26444: Integration test framework

pranavbhandari24 opened a new pull request, #26444:
URL: https://github.com/apache/beam/pull/26444

   Integration / Load test utilities.
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1196696735


##########
it/common/src/main/java/org/apache/beam/it/common/ResourceManager.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+/** Common interface across resource managers. */
+public interface ResourceManager {

Review Comment:
   Changed the resource managers to match `Closable` and `AutoCloseable` style in https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/681. However, some resource managers require an intermediate interface (e.g JDBC) so I have left them as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1258920903


##########
it/common/src/main/java/org/apache/beam/it/common/AbstractPipelineLauncher.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;

Review Comment:
   I agree. I moved some of the files in common into separate modules. We can rename the `common` module to something like `core` if that's better. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #26444: Integration/Load test framework

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1261208309


##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import static org.apache.beam.it.cassandra.CassandraResourceManagerUtils.generateKeyspaceName;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.DriverTimeoutException;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.it.common.ResourceManager;
+import org.apache.beam.it.testcontainers.TestContainerResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Client for managing Cassandra resources.
+ *
+ * <p>The class supports one database and multiple collections per database object. A database is
+ * created when the first collection is created if one has not been created already.
+ *
+ * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time,
+ * microsecond precision}", with additional formatting.
+ *
+ * <p>The class is thread-safe.
+ */
+public class CassandraResourceManager extends TestContainerResourceManager<GenericContainer<?>>

Review Comment:
   Could we merge the framework and the Cassandra LT in separate pull requests? I want to get this in ASAP but 25k lines with a lot of error checking disabled is a bit of a problem.



##########
it/common/gradle.properties:
##########
@@ -0,0 +1,18 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+enableCheckerFramework=false

Review Comment:
   remove



##########
it/cassandra/gradle.properties:
##########
@@ -0,0 +1,18 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+enableCheckerFramework=false

Review Comment:
   remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #26444: Integration/Load test framework

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1535066903

   waiting on author


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1629745315

   R: @Abacn, @johnjcasey 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #26444: Integration/Load test framework

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1568535196

   stop reviewer notifications


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26444: Integration/Load test framework

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1568537065

   Stopping reviewer notifications for this pull request: requested by reviewer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1258922268


##########
it/mongodb/src/test/java/org/apache/beam/it/mongodb/DefaultMongoDBResourceManagerTest.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.mongodb;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import java.io.IOException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.bson.Document;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.testcontainers.containers.MongoDBContainer;
+
+/** Unit tests for {@link DefaultMongoDBResourceManager}. */
+@RunWith(JUnit4.class)
+public class DefaultMongoDBResourceManagerTest {
+
+  @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private MongoIterable<String> collectionIterable;
+
+  @Mock private MongoClient mongoClient;
+  @Mock private MongoDatabase database;
+  @Mock private MongoCollection<Document> collection;
+  @Mock private MongoCursor<String> collectionNames;
+  @Mock private MongoDBContainer container;

Review Comment:
   Thanks, I agree. I have reduced the amount of mocks in test classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1197059980


##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import static org.apache.beam.it.common.utils.ResourceManagerUtils.generateResourceId;
+
+import com.google.re2j.Pattern;

Review Comment:
   Changed to use java Patterns in https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/710.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #26444: Integration/Load test framework

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1178401857


##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra.matchers;
+
+import static org.apache.beam.it.common.matchers.TemplateAsserts.assertThatRecords;
+
+import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.matchers.RecordsSubject;
+
+public class CassandraAsserts {
+
+  /**
+   * Convert Cassandra {@link com.datastax.oss.driver.api.core.cql.Row} list to a list of maps.
+   *
+   * @param rows Rows to parse.
+   * @return List of maps to use in {@link RecordsSubject}.
+   */
+  @SuppressWarnings("argument")

Review Comment:
   why suppressed?



##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/DefaultCassandraResourceManager.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.DriverTimeoutException;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.testcontainers.TestContainerResourceManager;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Default class for implementation of {@link CassandraResourceManager} interface.
+ *
+ * <p>The class supports one database and multiple collections per database object. A database is
+ * created when the first collection is created if one has not been created already.
+ *
+ * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time,
+ * microsecond precision}", with additional formatting.
+ *
+ * <p>The class is thread-safe.
+ */
+public class DefaultCassandraResourceManager
+    extends TestContainerResourceManager<GenericContainer<?>> implements CassandraResourceManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultCassandraResourceManager.class);
+
+  private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra";
+
+  // A list of available Cassandra Docker image tags can be found at
+  // https://hub.docker.com/_/cassandra/tags
+  private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0";
+
+  // 27017 is the default port that Cassandra is configured to listen on
+  private static final int CASSANDRA_INTERNAL_PORT = 9042;
+
+  private final CqlSession cassandraClient;
+  private final String keyspaceName;
+  private final boolean usingStaticDatabase;
+
+  private DefaultCassandraResourceManager(Builder builder) {
+    this(
+        /* cassandraClient= */ null,
+        new CassandraContainer<>(
+            DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
+        builder);
+  }
+
+  @SuppressWarnings({"rawtypes", "method.invocation"})
+  @VisibleForTesting
+  DefaultCassandraResourceManager(
+      @Nullable CqlSession cassandraClient, CassandraContainer container, Builder builder) {
+    super(container, builder);
+
+    this.usingStaticDatabase = builder.keyspaceName != null;
+    this.keyspaceName =
+        usingStaticDatabase
+            ? builder.keyspaceName
+            : CassandraResourceManagerUtils.generateKeyspaceName(builder.testId);
+    this.cassandraClient =
+        cassandraClient == null
+            ? CqlSession.builder()
+                .addContactPoint(
+                    new InetSocketAddress(this.getHost(), this.getPort(CASSANDRA_INTERNAL_PORT)))
+                .withLocalDatacenter("datacenter1")
+                .build()
+            : cassandraClient;
+
+    if (!usingStaticDatabase) {
+      this.cassandraClient.execute(
+          String.format(
+              "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}",
+              this.keyspaceName));
+    }
+  }
+
+  public static Builder builder(String testId) throws IOException {
+    return new Builder(testId);
+  }
+
+  @Override
+  public int getPort() {
+    return super.getPort(CASSANDRA_INTERNAL_PORT);
+  }
+
+  @Override
+  public synchronized String getKeyspaceName() {
+    return keyspaceName;
+  }
+
+  @Override
+  public synchronized ResultSet executeStatement(String statement) {
+    LOG.info("Executing statement: {}", statement);
+
+    try {
+      return cassandraClient.execute(
+          SimpleStatement.newInstance(statement).setKeyspace(this.keyspaceName));
+    } catch (Exception e) {
+      throw new CassandraResourceManagerException("Error reading collection.", e);
+    }
+  }
+
+  /**
+   * Inserts the given Document into a table.
+   *
+   * <p>A database will be created here, if one does not already exist.
+   *
+   * @param tableName The name of the table to insert the document into.
+   * @param document The document to insert into the table.
+   * @return A boolean indicating whether the Document was inserted successfully.
+   */
+  public synchronized boolean insertDocument(String tableName, Map<String, Object> document) {
+    return insertDocuments(tableName, ImmutableList.of(document));
+  }
+
+  @Override
+  public synchronized boolean insertDocuments(
+      String tableName, List<Map<String, Object>> documents) {
+    LOG.info(
+        "Attempting to write {} documents to {}.{}.", documents.size(), keyspaceName, tableName);
+
+    try {
+      for (Map<String, Object> document : documents) {
+        executeStatement(createInsertStatement(tableName, document));
+      }
+    } catch (Exception e) {
+      throw new CassandraResourceManagerException("Error inserting documents.", e);
+    }
+
+    LOG.info("Successfully wrote {} documents to {}.{}", documents.size(), keyspaceName, tableName);
+
+    return true;
+  }
+
+  @Override
+  public synchronized Iterable<Row> readTable(String tableName) {
+    LOG.info("Reading all documents from {}.{}", keyspaceName, tableName);
+
+    Iterable<Row> documents;
+    try {
+      ResultSet resultSet = executeStatement(String.format("SELECT * FROM %s", tableName));
+      documents = resultSet.all();
+    } catch (Exception e) {
+      throw new CassandraResourceManagerException("Error reading table.", e);
+    }
+
+    LOG.info("Successfully loaded documents from {}.{}", keyspaceName, tableName);
+
+    return documents;
+  }
+
+  @Override
+  public synchronized void cleanupAll() {
+    LOG.info("Attempting to cleanup Cassandra manager.");
+
+    boolean producedError = false;
+
+    // First, delete the database if it was not given as a static argument
+    if (!usingStaticDatabase) {
+      try {
+        executeStatement(String.format("DROP KEYSPACE IF EXISTS %s", this.keyspaceName));
+      } catch (Exception e) {
+        LOG.error("Failed to drop Cassandra keyspace {}.", keyspaceName, e);
+
+        // Only bubble exception if the cause is not timeout, as it will be dropped with container.
+        if (e.getCause() == null || !(e.getCause() instanceof DriverTimeoutException)) {
+          producedError = true;
+        }
+      }
+    }
+
+    // Next, try to close the Cassandra client connection
+    try {
+      cassandraClient.close();
+    } catch (Exception e) {
+      LOG.error("Failed to delete Cassandra client.", e);
+      producedError = true;
+    }
+
+    // Throw Exception at the end if there were any errors
+    if (producedError) {
+      throw new CassandraResourceManagerException(
+          "Failed to delete resources. Check above for errors.");
+    }
+
+    super.cleanupAll();
+
+    LOG.info("Cassandra manager successfully cleaned up.");
+  }
+
+  private String createInsertStatement(String tableName, Map<String, Object> map) {
+    StringBuilder columns = new StringBuilder();
+    StringBuilder values = new StringBuilder();
+
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+      columns.append(entry.getKey()).append(", ");
+
+      // add quotes around strings
+      if (entry.getValue() instanceof String) {
+        values.append("'").append(entry.getValue()).append("'");
+      } else {
+        values.append(entry.getValue());
+      }
+      values.append(", ");
+    }
+
+    // Remove trailing comma and space
+    columns.delete(columns.length() - 2, columns.length());
+    values.delete(values.length() - 2, values.length());
+
+    return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, columns, values);
+  }
+
+  /** Builder for {@link DefaultCassandraResourceManager}. */
+  public static final class Builder
+      extends TestContainerResourceManager.Builder<DefaultCassandraResourceManager> {
+
+    @SuppressWarnings("initialization.fields.uninitialized")

Review Comment:
   why suppressed?



##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/DefaultCassandraResourceManager.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.DriverTimeoutException;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.testcontainers.TestContainerResourceManager;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Default class for implementation of {@link CassandraResourceManager} interface.
+ *
+ * <p>The class supports one database and multiple collections per database object. A database is
+ * created when the first collection is created if one has not been created already.
+ *
+ * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time,
+ * microsecond precision}", with additional formatting.
+ *
+ * <p>The class is thread-safe.
+ */
+public class DefaultCassandraResourceManager
+    extends TestContainerResourceManager<GenericContainer<?>> implements CassandraResourceManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultCassandraResourceManager.class);
+
+  private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra";
+
+  // A list of available Cassandra Docker image tags can be found at
+  // https://hub.docker.com/_/cassandra/tags
+  private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0";
+
+  // 27017 is the default port that Cassandra is configured to listen on
+  private static final int CASSANDRA_INTERNAL_PORT = 9042;
+
+  private final CqlSession cassandraClient;
+  private final String keyspaceName;
+  private final boolean usingStaticDatabase;
+
+  private DefaultCassandraResourceManager(Builder builder) {
+    this(
+        /* cassandraClient= */ null,
+        new CassandraContainer<>(
+            DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
+        builder);
+  }
+
+  @SuppressWarnings({"rawtypes", "method.invocation"})

Review Comment:
   why suppressed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1258918296


##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra.matchers;
+
+import static org.apache.beam.it.common.matchers.TemplateAsserts.assertThatRecords;
+
+import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.matchers.RecordsSubject;
+
+public class CassandraAsserts {
+
+  /**
+   * Convert Cassandra {@link com.datastax.oss.driver.api.core.cql.Row} list to a list of maps.
+   *
+   * @param rows Rows to parse.
+   * @return List of maps to use in {@link RecordsSubject}.
+   */
+  @SuppressWarnings("argument")

Review Comment:
   I have disabled checker framework on all `it` submodules. As suggested, I created an issue to enable checker framework on it submodules after the code drop.



##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra.matchers;
+
+import static org.apache.beam.it.common.matchers.TemplateAsserts.assertThatRecords;
+
+import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.matchers.RecordsSubject;
+
+public class CassandraAsserts {
+
+  /**
+   * Convert Cassandra {@link com.datastax.oss.driver.api.core.cql.Row} list to a list of maps.
+   *
+   * @param rows Rows to parse.
+   * @return List of maps to use in {@link RecordsSubject}.
+   */
+  @SuppressWarnings("argument")

Review Comment:
   https://github.com/apache/beam/issues/27438



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1629742809

   @kennknowles I have addressed your comments. The PR is ready for another pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 closed pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 closed pull request #26444: Integration/Load test framework
URL: https://github.com/apache/beam/pull/26444


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1196709490


##########
it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.it.common.utils.PipelineUtils.createJobName;
+
+import com.google.api.services.dataflow.model.Job;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/** Client for working with Cloud Dataflow. */
+public interface PipelineLauncher {
+  /** Enum representing Apache Beam SDKs. */
+  enum Sdk {
+    JAVA("JAVA"),
+    PYTHON("PYTHON"),
+    GO("GO");
+
+    private final String text;
+
+    Sdk(String text) {
+      this.text = text;
+    }
+
+    @Override
+    public String toString() {
+      return text;
+    }
+  }
+
+  /** Enum representing known Dataflow job states. */
+  enum JobState {
+    UNKNOWN("JOB_STATE_UNKNOWN"),
+    STOPPED("JOB_STATE_STOPPED"),
+    RUNNING("JOB_STATE_RUNNING"),
+    DONE("JOB_STATE_DONE"),
+    FAILED("JOB_STATE_FAILED"),
+    CANCELLED("JOB_STATE_CANCELLED"),
+    UPDATED("JOB_STATE_UPDATED"),
+    DRAINING("JOB_STATE_DRAINING"),
+    DRAINED("JOB_STATE_DRAINED"),
+    PENDING("JOB_STATE_PENDING"),
+    CANCELLING("JOB_STATE_CANCELLING"),
+    QUEUED("JOB_STATE_QUEUED"),
+    RESOURCE_CLEANING_UP("JOB_STATE_RESOURCE_CLEANING_UP");
+
+    private static final String DATAFLOW_PREFIX = "JOB_STATE_";
+
+    /** States that indicate the job is getting ready to run. */
+    public static final ImmutableSet<JobState> PENDING_STATES = ImmutableSet.of(PENDING, QUEUED);

Review Comment:
   I couldn't find it in the DataflowRunner code base, and I'm not sure if it makes sense to add this in that code base if it is only going to be used here?
   
   Either way, I'd like to move the framework to beam from the Templates repo before making such changes to make the migration simpler. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1196710759


##########
it/common/src/main/java/org/apache/beam/it/common/matchers/LaunchInfoSubject.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common.matchers;
+
+import com.google.common.truth.FailureMetadata;
+import com.google.common.truth.Subject;
+import org.apache.beam.it.common.PipelineLauncher.JobState;
+import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
+
+/**
+ * Subject that has assertion operations for {@link LaunchInfo}, which has the information for a
+ * recently launched pipeline.
+ */
+public final class LaunchInfoSubject extends Subject {
+
+  private final LaunchInfo actual;
+
+  private LaunchInfoSubject(FailureMetadata metadata, LaunchInfo actual) {
+    super(metadata, actual);

Review Comment:
   Yeah, it is built into Truth. Inheritance is the recommended way of using `Subject`. See https://truth.dev/extension.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26444: Integration/Load test framework

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1565382734

   Reminder, please take a look at this pr: @kennknowles @damccorm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1197060254


##########
it/common/src/main/java/org/apache/beam/it/common/AbstractPipelineLauncher.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.it.common.logging.LogStrings.formatForLogging;
+import static org.apache.beam.it.common.utils.RetryUtil.clientRetryPolicy;
+
+import com.google.api.client.util.ArrayMap;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.Environment;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import dev.failsafe.Failsafe;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;

Review Comment:
   Fixed in https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/710.



##########
it/mongodb/build.gradle:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.it.mongodb',
+)
+
+description = "Apache Beam :: IT :: MongoDB"
+ext.summary = "Integration test utilities for MongoDB."
+
+dependencies {
+    implementation project(':it:common')
+    implementation library.java.testcontainers_base
+    implementation library.java.testcontainers_mongodb
+    implementation library.java.vendored_guava_26_0_jre
+    implementation library.java.slf4j_api
+    //implementation 'org.mongodb:mongodb-driver-sync:3.12.10'
+    implementation library.java.mongo_java_driver
+    implementation 'com.google.re2j:re2j:1.6'

Review Comment:
   Fixed in https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/710.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #26444: Integration/Load test framework

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1259800548


##########
it/common/build.gradle:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+    exportJavadoc: false,
+    automaticModuleName: 'org.apache.beam.it.common',
+    validateShadowJar: false,
+    shadowClosure: {},
+)
+
+description = "Apache Beam :: IT :: Common"
+ext.summary = "Code used by all integration test utilities."
+
+dependencies {
+    implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation library.java.google_api_services_dataflow
+    implementation library.java.google_auth_library_credentials
+    implementation library.java.google_auth_library_oauth2_http
+    implementation library.java.vendored_guava_26_0_jre
+    implementation library.java.slf4j_api
+    implementation library.java.commons_lang3
+    implementation 'dev.failsafe:failsafe:3.3.0'

Review Comment:
   please do this as a library in the plugin



##########
it/testcontainers/build.gradle:
##########
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* License); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an AS IS BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+    automaticModuleName: 'org.apache.beam.it.testcontainers',
+    enableSpotbugs: false,
+    validateShadowJar: false,
+    shadowClosure: {},
+)
+
+description = "Apache Beam :: IT :: Testcontainers"
+ext.summary = "Integration test utilities for Testcontainers."
+
+dependencies {
+    implementation project(path: ":it:common", configuration: "shadow")
+    implementation library.java.testcontainers_base
+
+    testImplementation 'com.google.truth:truth:1.0.1'

Review Comment:
   This should be done as a library, similar to our other libraries



##########
it/common/src/main/java/org/apache/beam/it/common/TestProperties.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.ComputeEngineCredentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility for accessing system properties set for the test.
+ *
+ * <p>There are two types of properties: those set on the command lines and those set as environment
+ * variables. Those set on the command line always follow a camelCase naming convention, and those
+ * set as environment variable always follow a CAPITALIZED_SNAKE_CASE naming convention.
+ */
+public final class TestProperties {
+  private TestProperties() {}
+
+  // For testability, it is normally best to expect each property from the command line. We should
+  // only expect an environment variable if we're trying to avoid an accidental log of the
+  // value.
+
+  // From command line
+  public static final String ARTIFACT_BUCKET_KEY = "artifactBucket";
+  public static final String PROJECT_KEY = "project";
+  public static final String REGION_KEY = "region";
+  public static final String STAGE_BUCKET = "stageBucket";
+  public static final String EXPORT_DATASET_KEY = "exportDataset";
+  public static final String EXPORT_PROJECT = "exportProject";
+  public static final String EXPORT_TABLE_KEY = "exportTable";
+  public static final String SPEC_PATH_KEY = "specPath";
+  public static final String HOST_IP = "hostIp";
+  // From environment variables
+  public static final String ACCESS_TOKEN_KEY = "DT_IT_ACCESS_TOKEN";
+
+  // Default values for optional properties
+  public static final String DEFAULT_REGION = "us-central1";
+
+  // Error messages
+  private static final String CLI_ERR_MSG = "-D%s is required on the command line";
+  private static final String ENV_VAR_MSG = "%s is required as an environment variable";
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestProperties.class);
+
+  public static boolean hasAccessToken() {
+    return getProperty(ACCESS_TOKEN_KEY, null, Type.ENVIRONMENT_VARIABLE) != null;
+  }
+
+  public static String accessToken() {
+    return getProperty(ACCESS_TOKEN_KEY, Type.ENVIRONMENT_VARIABLE, true);
+  }
+
+  /**
+   * Create and return credentials based on whether access token was provided or not.
+   *
+   * <p>If access token was provided, use the token for Bearer authentication.
+   *
+   * <p>If not, use Application Default Credentials. Check
+   * https://cloud.google.com/docs/authentication/application-default-credentials for more
+   * information.
+   *
+   * @return Credentials.
+   */
+  public static Credentials credentials() {
+    if (hasAccessToken()) {
+      return googleCredentials();
+    } else {
+      return buildCredentialsFromEnv();
+    }
+  }
+
+  public static Credentials googleCredentials() {
+    Credentials credentials;
+    try {
+      if (hasAccessToken()) {
+        credentials =
+            new GoogleCredentials(new AccessToken(accessToken(), /* expirationTime= */ null));
+      } else {
+        credentials = GoogleCredentials.getApplicationDefault();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "Unable to get credentials! \n"
+              + "Please run the following command to set 60 minute access token, \n"
+              + "\t export DT_IT_ACCESS_TOKEN=$(gcloud auth application-default print-access-token) \n"
+              + "Please run the following command to set credentials using the gcloud command, "
+              + "\t gcloud auth application-default login");
+    }
+    return credentials;
+  }
+
+  public static boolean hasArtifactBucket() {
+    return getProperty(ARTIFACT_BUCKET_KEY, null, Type.PROPERTY) != null;
+  }
+
+  public static String artifactBucket() {
+    return bucketNameOnly(getProperty(ARTIFACT_BUCKET_KEY, Type.PROPERTY, true));
+  }
+
+  public static String exportDataset() {
+    return getProperty(EXPORT_DATASET_KEY, Type.PROPERTY, false);
+  }
+
+  public static String exportProject() {
+    return getProperty(EXPORT_PROJECT, Type.PROPERTY, false);
+  }
+
+  public static String exportTable() {
+    return getProperty(EXPORT_TABLE_KEY, Type.PROPERTY, false);
+  }
+
+  public static String project() {
+    return getProperty(PROJECT_KEY, Type.PROPERTY, true);
+  }
+
+  public static String region() {
+    return getProperty(REGION_KEY, DEFAULT_REGION, Type.PROPERTY);
+  }
+
+  public static String specPath() {
+    return getProperty(SPEC_PATH_KEY, Type.PROPERTY, false);
+  }
+
+  public static boolean hasStageBucket() {
+    return getProperty(STAGE_BUCKET, null, Type.PROPERTY) != null;
+  }
+
+  public static String stageBucket() {
+    return bucketNameOnly(getProperty(STAGE_BUCKET, Type.PROPERTY, false));
+  }
+
+  public static String hostIp() {
+    return getProperty(HOST_IP, "localhost", Type.PROPERTY);
+  }
+
+  /** Gets a property or throws an exception if it is not found. */
+  private static String getProperty(String name, Type type, boolean required) {
+    String value = getProperty(name, null, type);
+
+    if (required) {
+      String errMsg =
+          type == Type.PROPERTY
+              ? String.format(CLI_ERR_MSG, name)
+              : String.format(ENV_VAR_MSG, name);
+      checkState(value != null, errMsg);
+    }
+
+    return value;
+  }
+
+  /** Gets a property or returns {@code defaultValue} if it is not found. */
+  public static String getProperty(String name, @Nullable String defaultValue, Type type) {
+    String value = type == Type.PROPERTY ? System.getProperty(name) : System.getenv(name);
+    return value != null ? value : defaultValue;
+  }
+
+  /** Defines the types of properties there may be. */
+  public enum Type {
+    PROPERTY,
+    ENVIRONMENT_VARIABLE
+  }
+
+  /**
+   * Infers the {@link Credentials} to use with Google services from the current environment
+   * settings.
+   *
+   * <p>First, checks if {@link ServiceAccountCredentials#getApplicationDefault()} returns Compute
+   * Engine credentials, which means that it is running from a GCE instance and can use the Service
+   * Account configured for that VM. Will use that
+   *
+   * <p>Secondly, it will try to get the environment variable
+   * <strong>GOOGLE_APPLICATION_CREDENTIALS</strong>, and use that Service Account if configured to
+   * doing so. The method {@link #getCredentialsStream()} will make sure to search for the specific
+   * file using both the file system and classpath.
+   *
+   * <p>If <strong>GOOGLE_APPLICATION_CREDENTIALS</strong> is not configured, it will return the
+   * application default, which is often setup through <strong>gcloud auth application-default
+   * login</strong>.
+   */
+  public static Credentials buildCredentialsFromEnv() {
+    try {
+
+      // if on Compute Engine, return default credentials.
+      GoogleCredentials applicationDefault = ServiceAccountCredentials.getApplicationDefault();
+      try {
+        if (applicationDefault instanceof ComputeEngineCredentials) {

Review Comment:
   this looks quite strange, what throws an exception here?



##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import static org.apache.beam.it.cassandra.CassandraResourceManagerUtils.generateKeyspaceName;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.DriverTimeoutException;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.it.common.ResourceManager;
+import org.apache.beam.it.testcontainers.TestContainerResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Client for managing Cassandra resources.
+ *
+ * <p>The class supports one database and multiple collections per database object. A database is
+ * created when the first collection is created if one has not been created already.
+ *
+ * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time,
+ * microsecond precision}", with additional formatting.
+ *
+ * <p>The class is thread-safe.
+ */
+public class CassandraResourceManager extends TestContainerResourceManager<GenericContainer<?>>
+    implements ResourceManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraResourceManager.class);
+
+  private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra";
+
+  // A list of available Cassandra Docker image tags can be found at
+  // https://hub.docker.com/_/cassandra/tags
+  private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0";
+
+  // 27017 is the default port that Cassandra is configured to listen on
+  private static final int CASSANDRA_INTERNAL_PORT = 9042;

Review Comment:
   the comment and the variable don't allign



##########
it/common/src/main/java/org/apache/beam/it/common/utils/ResourceManagerUtils.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common.utils;
+
+import static java.lang.Math.min;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing.goodFastHash;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.ResourceManager;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashFunction;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Common utilities for ResourceManager implementations. */
+public class ResourceManagerUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerUtils.class);
+
+  private static final int MIN_PROJECT_ID_LENGTH = 4;
+  private static final int MAX_PROJECT_ID_LENGTH = 30;
+  private static final Pattern ILLEGAL_PROJECT_CHARS = Pattern.compile("[^a-zA-Z0-9-!:\\.']");
+  private static final String TIME_ZONE = "UTC";
+
+  /**
+   * Generates a new id string from an existing one.
+   *
+   * @param id The id string to generate a new id from.
+   * @param targetLength The length of the new id to generate. Must be greater than 8.
+   */
+  public static String generateNewId(String id, int targetLength) {
+    if (id.length() <= targetLength) {
+      return id;
+    }
+
+    if (targetLength <= 8) {
+      throw new IllegalArgumentException("targetLength must be greater than 8");
+    }
+
+    HashFunction hashFunction = goodFastHash(32);
+    String hash = hashFunction.hashUnencodedChars(id).toString();
+    return id.substring(0, targetLength - hash.length() - 1) + "-" + hash;
+  }
+
+  /**
+   * Generates a generic resource id from a given string, avoiding characters specified in the
+   * illegalChars Pattern. The length of the generated string ID will not exceed the length
+   * specified by targetLength.
+   *
+   * @param baseString the base ID to generate the resource ID from.
+   * @param illegalChars a pattern of characters to remove from the generated ID.
+   * @param replaceChar the character to replace all illegal characters with.
+   * @param targetLength the max length of the generated ID.
+   * @return the generated resource ID.
+   */
+  public static String generateResourceId(
+      String baseString,
+      Pattern illegalChars,
+      String replaceChar,
+      int targetLength,
+      DateTimeFormatter timeFormat) {
+    // first, make sure the baseString, typically the test ID, is not empty
+    checkArgument(baseString.length() != 0, "baseString cannot be empty.");
+
+    // next, replace all illegal characters from given string with given replacement character
+    String illegalCharsRemoved =
+        illegalChars.matcher(baseString.toLowerCase()).replaceAll(replaceChar);
+
+    // finally, append the date/time and return the substring that does not exceed the length limit
+    LocalDateTime localDateTime = LocalDateTime.now(ZoneId.of(TIME_ZONE));
+    String timeAddOn = localDateTime.format(timeFormat);
+    return illegalCharsRemoved.subSequence(
+            0, min(targetLength - timeAddOn.length() - 1, illegalCharsRemoved.length()))
+        + replaceChar
+        + localDateTime.format(timeFormat);
+  }
+
+  /** Generates random letter for padding. */
+  public static char generatePadding() {
+    Random random = new Random();
+    return (char) ('a' + random.nextInt(26));
+  }
+
+  /**
+   * Checks whether the given project ID is valid according to GCP constraints.
+   *
+   * @param idToCheck the project ID to check.
+   * @throws IllegalArgumentException if the project ID is invalid.
+   */
+  public static void checkValidProjectId(String idToCheck) {
+    if (idToCheck.length() < MIN_PROJECT_ID_LENGTH) {
+      throw new IllegalArgumentException("Project ID " + idToCheck + " cannot be empty.");
+    }
+    if (idToCheck.length() > MAX_PROJECT_ID_LENGTH) {
+      throw new IllegalArgumentException(
+          "Project ID "
+              + idToCheck
+              + " cannot be longer than "
+              + MAX_PROJECT_ID_LENGTH
+              + " characters.");
+    }
+    if (ILLEGAL_PROJECT_CHARS.matcher(idToCheck).find()) {
+      throw new IllegalArgumentException(
+          "Project ID "
+              + idToCheck
+              + " is not a valid ID. Only letters, numbers, hyphens, single quotes, colon, dot and"
+              + " exclamation points are allowed.");
+    }
+  }
+
+  /**
+   * Cleanup Resources from the given ResourceManagers. It will guarantee that all the cleanups are
+   * invoked, but still throws / bubbles the first exception at the end if something went wrong.
+   *
+   * @param managers Varargs of the managers to clean
+   */
+  public static void cleanResources(ResourceManager... managers) {
+
+    if (managers == null || managers.length == 0) {
+      return;
+    }
+
+    Exception bubbleException = null;
+
+    for (ResourceManager manager : managers) {
+      if (manager == null) {
+        continue;
+      }
+      try {
+        LOG.info("Cleaning up resource manager {}", manager.getClass().getSimpleName());
+        manager.cleanupAll();
+      } catch (Exception e) {
+        LOG.error("Error cleaning the resource manager {}", manager.getClass().getSimpleName());
+        if (bubbleException == null) {
+          bubbleException = e;
+        }
+      }
+    }
+
+    if (bubbleException != null) {
+      throw new RuntimeException("Error cleaning up resources", bubbleException);
+    }
+  }
+
+  public static String generatePassword(

Review Comment:
   This should have some commentary on how it should be used. This is not cryptographically secure, but is probably fine for temporary test envs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1258918858


##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/DefaultCassandraResourceManager.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.DriverTimeoutException;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.testcontainers.TestContainerResourceManager;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Default class for implementation of {@link CassandraResourceManager} interface.
+ *
+ * <p>The class supports one database and multiple collections per database object. A database is
+ * created when the first collection is created if one has not been created already.
+ *
+ * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time,
+ * microsecond precision}", with additional formatting.
+ *
+ * <p>The class is thread-safe.
+ */
+public class DefaultCassandraResourceManager
+    extends TestContainerResourceManager<GenericContainer<?>> implements CassandraResourceManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultCassandraResourceManager.class);
+
+  private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra";
+
+  // A list of available Cassandra Docker image tags can be found at
+  // https://hub.docker.com/_/cassandra/tags
+  private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0";
+
+  // 27017 is the default port that Cassandra is configured to listen on
+  private static final int CASSANDRA_INTERNAL_PORT = 9042;
+
+  private final CqlSession cassandraClient;
+  private final String keyspaceName;
+  private final boolean usingStaticDatabase;
+
+  private DefaultCassandraResourceManager(Builder builder) {
+    this(
+        /* cassandraClient= */ null,
+        new CassandraContainer<>(
+            DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
+        builder);
+  }
+
+  @SuppressWarnings({"rawtypes", "method.invocation"})
+  @VisibleForTesting
+  DefaultCassandraResourceManager(
+      @Nullable CqlSession cassandraClient, CassandraContainer container, Builder builder) {
+    super(container, builder);
+
+    this.usingStaticDatabase = builder.keyspaceName != null;
+    this.keyspaceName =
+        usingStaticDatabase
+            ? builder.keyspaceName
+            : CassandraResourceManagerUtils.generateKeyspaceName(builder.testId);
+    this.cassandraClient =
+        cassandraClient == null
+            ? CqlSession.builder()
+                .addContactPoint(
+                    new InetSocketAddress(this.getHost(), this.getPort(CASSANDRA_INTERNAL_PORT)))
+                .withLocalDatacenter("datacenter1")
+                .build()
+            : cassandraClient;
+
+    if (!usingStaticDatabase) {
+      this.cassandraClient.execute(
+          String.format(
+              "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}",
+              this.keyspaceName));
+    }
+  }
+
+  public static Builder builder(String testId) throws IOException {
+    return new Builder(testId);
+  }
+
+  @Override
+  public int getPort() {
+    return super.getPort(CASSANDRA_INTERNAL_PORT);
+  }
+
+  @Override
+  public synchronized String getKeyspaceName() {
+    return keyspaceName;
+  }
+
+  @Override
+  public synchronized ResultSet executeStatement(String statement) {
+    LOG.info("Executing statement: {}", statement);
+
+    try {
+      return cassandraClient.execute(
+          SimpleStatement.newInstance(statement).setKeyspace(this.keyspaceName));
+    } catch (Exception e) {
+      throw new CassandraResourceManagerException("Error reading collection.", e);
+    }
+  }
+
+  /**
+   * Inserts the given Document into a table.
+   *
+   * <p>A database will be created here, if one does not already exist.
+   *
+   * @param tableName The name of the table to insert the document into.
+   * @param document The document to insert into the table.
+   * @return A boolean indicating whether the Document was inserted successfully.
+   */
+  public synchronized boolean insertDocument(String tableName, Map<String, Object> document) {
+    return insertDocuments(tableName, ImmutableList.of(document));
+  }
+
+  @Override
+  public synchronized boolean insertDocuments(
+      String tableName, List<Map<String, Object>> documents) {
+    LOG.info(
+        "Attempting to write {} documents to {}.{}.", documents.size(), keyspaceName, tableName);
+
+    try {
+      for (Map<String, Object> document : documents) {
+        executeStatement(createInsertStatement(tableName, document));
+      }
+    } catch (Exception e) {
+      throw new CassandraResourceManagerException("Error inserting documents.", e);
+    }
+
+    LOG.info("Successfully wrote {} documents to {}.{}", documents.size(), keyspaceName, tableName);
+
+    return true;
+  }
+
+  @Override
+  public synchronized Iterable<Row> readTable(String tableName) {
+    LOG.info("Reading all documents from {}.{}", keyspaceName, tableName);
+
+    Iterable<Row> documents;
+    try {
+      ResultSet resultSet = executeStatement(String.format("SELECT * FROM %s", tableName));
+      documents = resultSet.all();
+    } catch (Exception e) {
+      throw new CassandraResourceManagerException("Error reading table.", e);
+    }
+
+    LOG.info("Successfully loaded documents from {}.{}", keyspaceName, tableName);
+
+    return documents;
+  }
+
+  @Override
+  public synchronized void cleanupAll() {
+    LOG.info("Attempting to cleanup Cassandra manager.");
+
+    boolean producedError = false;
+
+    // First, delete the database if it was not given as a static argument
+    if (!usingStaticDatabase) {
+      try {
+        executeStatement(String.format("DROP KEYSPACE IF EXISTS %s", this.keyspaceName));
+      } catch (Exception e) {
+        LOG.error("Failed to drop Cassandra keyspace {}.", keyspaceName, e);
+
+        // Only bubble exception if the cause is not timeout, as it will be dropped with container.
+        if (e.getCause() == null || !(e.getCause() instanceof DriverTimeoutException)) {
+          producedError = true;
+        }
+      }
+    }
+
+    // Next, try to close the Cassandra client connection
+    try {
+      cassandraClient.close();
+    } catch (Exception e) {
+      LOG.error("Failed to delete Cassandra client.", e);
+      producedError = true;
+    }
+
+    // Throw Exception at the end if there were any errors
+    if (producedError) {
+      throw new CassandraResourceManagerException(
+          "Failed to delete resources. Check above for errors.");
+    }
+
+    super.cleanupAll();
+
+    LOG.info("Cassandra manager successfully cleaned up.");
+  }
+
+  private String createInsertStatement(String tableName, Map<String, Object> map) {
+    StringBuilder columns = new StringBuilder();
+    StringBuilder values = new StringBuilder();
+
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+      columns.append(entry.getKey()).append(", ");
+
+      // add quotes around strings
+      if (entry.getValue() instanceof String) {
+        values.append("'").append(entry.getValue()).append("'");
+      } else {
+        values.append(entry.getValue());
+      }
+      values.append(", ");
+    }
+
+    // Remove trailing comma and space
+    columns.delete(columns.length() - 2, columns.length());
+    values.delete(values.length() - 2, values.length());
+
+    return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, columns, values);
+  }
+
+  /** Builder for {@link DefaultCassandraResourceManager}. */
+  public static final class Builder
+      extends TestContainerResourceManager.Builder<DefaultCassandraResourceManager> {
+
+    @SuppressWarnings("initialization.fields.uninitialized")

Review Comment:
   Will address after code drop. Opened https://github.com/apache/beam/issues/27438.



##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/DefaultCassandraResourceManager.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.DriverTimeoutException;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.testcontainers.TestContainerResourceManager;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Default class for implementation of {@link CassandraResourceManager} interface.
+ *
+ * <p>The class supports one database and multiple collections per database object. A database is
+ * created when the first collection is created if one has not been created already.
+ *
+ * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time,
+ * microsecond precision}", with additional formatting.
+ *
+ * <p>The class is thread-safe.
+ */
+public class DefaultCassandraResourceManager
+    extends TestContainerResourceManager<GenericContainer<?>> implements CassandraResourceManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultCassandraResourceManager.class);
+
+  private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra";
+
+  // A list of available Cassandra Docker image tags can be found at
+  // https://hub.docker.com/_/cassandra/tags
+  private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0";
+
+  // 27017 is the default port that Cassandra is configured to listen on
+  private static final int CASSANDRA_INTERNAL_PORT = 9042;
+
+  private final CqlSession cassandraClient;
+  private final String keyspaceName;
+  private final boolean usingStaticDatabase;
+
+  private DefaultCassandraResourceManager(Builder builder) {
+    this(
+        /* cassandraClient= */ null,
+        new CassandraContainer<>(
+            DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
+        builder);
+  }
+
+  @SuppressWarnings({"rawtypes", "method.invocation"})

Review Comment:
   Will address after code drop. Opened https://github.com/apache/beam/issues/27438.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #26444: Integration/Load test framework

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1524039736

   ## [Codecov](https://codecov.io/gh/apache/beam/pull/26444?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#26444](https://codecov.io/gh/apache/beam/pull/26444?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f2e6f0e) into [master](https://codecov.io/gh/apache/beam/commit/f770c9b839f47422f0582083c48a53948bf06a5e?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f770c9b) will **decrease** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #26444      +/-   ##
   ==========================================
   - Coverage   81.12%   81.11%   -0.02%     
   ==========================================
     Files         469      469              
     Lines       67294    67294              
   ==========================================
   - Hits        54590    54583       -7     
   - Misses      12704    12711       +7     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `81.11% <ø> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   [see 11 files with indirect coverage changes](https://codecov.io/gh/apache/beam/pull/26444/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #26444: Integration/Load test framework

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1178405890


##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import static org.apache.beam.it.common.utils.ResourceManagerUtils.generateResourceId;
+
+import com.google.re2j.Pattern;

Review Comment:
   Why this over the stuff that ships with Java? It seems basically the same



##########
it/common/src/main/java/org/apache/beam/it/common/ResourceManager.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+/** Common interface across resource managers. */
+public interface ResourceManager {

Review Comment:
   This seems close to `Closeable` or `Autocloseable`. Would it make sense to use it in that style?



##########
it/common/src/main/java/org/apache/beam/it/common/AbstractPipelineLauncher.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;

Review Comment:
   "common" is not a very great module - it ends up just having everything in it. And the code in this file is for Dataflow, not common.



##########
it/common/src/main/java/org/apache/beam/it/common/AbstractPipelineLauncher.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.it.common.logging.LogStrings.formatForLogging;
+import static org.apache.beam.it.common.utils.RetryUtil.clientRetryPolicy;
+
+import com.google.api.client.util.ArrayMap;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.Environment;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import dev.failsafe.Failsafe;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;

Review Comment:
   Other files use a Google `Pattern` class, but here the one that comes with Java is used.



##########
it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.it.common.utils.PipelineUtils.createJobName;
+
+import com.google.api.services.dataflow.model.Job;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/** Client for working with Cloud Dataflow. */
+public interface PipelineLauncher {
+  /** Enum representing Apache Beam SDKs. */
+  enum Sdk {
+    JAVA("JAVA"),
+    PYTHON("PYTHON"),
+    GO("GO");
+
+    private final String text;
+
+    Sdk(String text) {
+      this.text = text;
+    }
+
+    @Override
+    public String toString() {
+      return text;
+    }
+  }
+
+  /** Enum representing known Dataflow job states. */
+  enum JobState {

Review Comment:
   Aren't these already in the SDK?



##########
it/common/src/main/java/org/apache/beam/it/common/conditions/ConditionCheck.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common.conditions;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;

Review Comment:
   use checker annotations



##########
it/common/src/main/java/org/apache/beam/it/common/package-info.java:
##########
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Package for managing common ResourceManager resources within integration tests. */
+package org.apache.beam.it.common;

Review Comment:
   just drop the `common`



##########
it/common/src/main/java/org/apache/beam/it/common/matchers/package-info.java:
##########
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Package for Truth matchers / subjects to have reusable assertions. */
+package org.apache.beam.it.common.matchers;

Review Comment:
   Since it is specifically for truth matchers, perhaps `org.apache.beam.it.truthmatchers` or something. It is fine as-is but imagine we had built the framework for Hamcrest and then Truth came out and we wanted to also support it. Separating modules according to the dependencies they pull in is often a good idea.



##########
it/mongodb/src/test/java/org/apache/beam/it/mongodb/DefaultMongoDBResourceManagerTest.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.mongodb;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import java.io.IOException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.bson.Document;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.testcontainers.containers.MongoDBContainer;
+
+/** Unit tests for {@link DefaultMongoDBResourceManager}. */
+@RunWith(JUnit4.class)
+public class DefaultMongoDBResourceManagerTest {
+
+  @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private MongoIterable<String> collectionIterable;
+
+  @Mock private MongoClient mongoClient;
+  @Mock private MongoDatabase database;
+  @Mock private MongoCollection<Document> collection;
+  @Mock private MongoCursor<String> collectionNames;
+  @Mock private MongoDBContainer container;

Review Comment:
   Overall, I'm concerned that the amount of mocking means that this file may create vacuous tests of our assumptions of how things behave, but do not test how they actually might behave. Is there any other way to have a lightweight fake?



##########
it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.it.common.utils.PipelineUtils.createJobName;
+
+import com.google.api.services.dataflow.model.Job;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/** Client for working with Cloud Dataflow. */

Review Comment:
   Can this framework support running pipelines more generally, or just Dataflow?



##########
it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.it.common.utils.PipelineUtils.createJobName;
+
+import com.google.api.services.dataflow.model.Job;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/** Client for working with Cloud Dataflow. */
+public interface PipelineLauncher {
+  /** Enum representing Apache Beam SDKs. */
+  enum Sdk {
+    JAVA("JAVA"),
+    PYTHON("PYTHON"),
+    GO("GO");
+
+    private final String text;
+
+    Sdk(String text) {
+      this.text = text;
+    }
+
+    @Override
+    public String toString() {
+      return text;
+    }
+  }
+
+  /** Enum representing known Dataflow job states. */
+  enum JobState {
+    UNKNOWN("JOB_STATE_UNKNOWN"),
+    STOPPED("JOB_STATE_STOPPED"),
+    RUNNING("JOB_STATE_RUNNING"),
+    DONE("JOB_STATE_DONE"),
+    FAILED("JOB_STATE_FAILED"),
+    CANCELLED("JOB_STATE_CANCELLED"),
+    UPDATED("JOB_STATE_UPDATED"),
+    DRAINING("JOB_STATE_DRAINING"),
+    DRAINED("JOB_STATE_DRAINED"),
+    PENDING("JOB_STATE_PENDING"),
+    CANCELLING("JOB_STATE_CANCELLING"),
+    QUEUED("JOB_STATE_QUEUED"),
+    RESOURCE_CLEANING_UP("JOB_STATE_RESOURCE_CLEANING_UP");
+
+    private static final String DATAFLOW_PREFIX = "JOB_STATE_";
+
+    /** States that indicate the job is getting ready to run. */
+    public static final ImmutableSet<JobState> PENDING_STATES = ImmutableSet.of(PENDING, QUEUED);

Review Comment:
   This stuff all seems like it would already be in the DataflowRunner code base, and that is the best place to add it.



##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/DefaultCassandraResourceManager.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.DriverTimeoutException;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;

Review Comment:
   Prefer checkerframework annotations since they can go in more places (like on type uses)



##########
it/common/src/main/java/org/apache/beam/it/common/PipelineOperator.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.function.Supplier;
+import org.apache.beam.it.common.PipelineLauncher.JobState;
+import org.apache.beam.sdk.function.ThrowingConsumer;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for managing Dataflow jobs. */
+public final class PipelineOperator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PipelineOperator.class);
+
+  /** The result of running an operation. */
+  public enum Result {
+    CONDITION_MET,
+    LAUNCH_FINISHED,
+    LAUNCH_FAILED,
+    TIMEOUT
+  }
+
+  private final PipelineLauncher client;
+
+  public PipelineOperator(PipelineLauncher client) {
+    this.client = client;
+  }
+
+  /**
+   * Waits until the given job is done, timing out it if runs for too long.
+   *
+   * <p>If the job is a batch job, it should complete eventually. If it is a streaming job, this
+   * will time out unless the job is explicitly cancelled or drained.
+   *
+   * @param config the configuration for performing the operation
+   * @return the result, which will be {@link Result#LAUNCH_FINISHED}, {@link Result#LAUNCH_FAILED}
+   *     or {@link Result#TIMEOUT}
+   */
+  @SuppressWarnings("rawtypes")

Review Comment:
   Prefer to suppress this on a single line that creates the `Supplier[]` and assigns it to a variable with a proper type like `Supplier<Boolean>[]`



##########
it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.it.common.utils.PipelineUtils.createJobName;
+
+import com.google.api.services.dataflow.model.Job;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/** Client for working with Cloud Dataflow. */
+public interface PipelineLauncher {
+  /** Enum representing Apache Beam SDKs. */
+  enum Sdk {
+    JAVA("JAVA"),
+    PYTHON("PYTHON"),
+    GO("GO");
+
+    private final String text;
+
+    Sdk(String text) {
+      this.text = text;
+    }
+
+    @Override
+    public String toString() {
+      return text;
+    }
+  }
+
+  /** Enum representing known Dataflow job states. */
+  enum JobState {
+    UNKNOWN("JOB_STATE_UNKNOWN"),
+    STOPPED("JOB_STATE_STOPPED"),
+    RUNNING("JOB_STATE_RUNNING"),
+    DONE("JOB_STATE_DONE"),
+    FAILED("JOB_STATE_FAILED"),
+    CANCELLED("JOB_STATE_CANCELLED"),
+    UPDATED("JOB_STATE_UPDATED"),
+    DRAINING("JOB_STATE_DRAINING"),
+    DRAINED("JOB_STATE_DRAINED"),
+    PENDING("JOB_STATE_PENDING"),
+    CANCELLING("JOB_STATE_CANCELLING"),
+    QUEUED("JOB_STATE_QUEUED"),
+    RESOURCE_CLEANING_UP("JOB_STATE_RESOURCE_CLEANING_UP");
+
+    private static final String DATAFLOW_PREFIX = "JOB_STATE_";
+
+    /** States that indicate the job is getting ready to run. */
+    public static final ImmutableSet<JobState> PENDING_STATES = ImmutableSet.of(PENDING, QUEUED);
+
+    /** States that indicate the job is running. */
+    public static final ImmutableSet<JobState> ACTIVE_STATES = ImmutableSet.of(RUNNING, UPDATED);
+
+    /** States that indicate that the job is done. */
+    public static final ImmutableSet<JobState> DONE_STATES =
+        ImmutableSet.of(CANCELLED, DONE, DRAINED, STOPPED);
+
+    /** States that indicate that the job has failed. */
+    public static final ImmutableSet<JobState> FAILED_STATES = ImmutableSet.of(FAILED);
+
+    /** States that indicate that the job is in the process of finishing. */
+    public static final ImmutableSet<JobState> FINISHING_STATES =
+        ImmutableSet.of(DRAINING, CANCELLING);
+
+    private final String text;
+
+    JobState(String text) {
+      this.text = text;
+    }
+
+    /**
+     * Parses the state from Dataflow.
+     *
+     * <p>Always use this in place of valueOf.
+     */
+    public static JobState parse(String fromDataflow) {
+      return valueOf(fromDataflow.replace(DATAFLOW_PREFIX, ""));
+    }
+
+    @Override
+    public String toString() {
+      return text;
+    }
+  }
+
+  /** Config for starting a Dataflow job. */
+  class LaunchConfig {
+    private final String jobName;
+    private final ImmutableMap<String, String> parameters;
+    private final ImmutableMap<String, Object> environment;
+    @Nullable private final String specPath;
+    @Nullable private final Sdk sdk;
+    @Nullable private final String executable;
+    @Nullable private final Pipeline pipeline;
+
+    private LaunchConfig(Builder builder) {
+      this.jobName = builder.jobName;
+      this.parameters = ImmutableMap.copyOf(builder.parameters);
+      this.environment = ImmutableMap.copyOf(builder.environment);
+      this.specPath = builder.specPath;
+      this.sdk = builder.sdk;
+      this.executable = builder.executable;
+      this.pipeline = builder.pipeline;
+    }
+
+    public String jobName() {
+      return jobName;
+    }
+
+    public ImmutableMap<String, String> parameters() {
+      return parameters;
+    }
+
+    public ImmutableMap<String, Object> environment() {
+      return environment;
+    }
+
+    @Nullable
+    public String getParameter(String key) {
+      return parameters.get(key);
+    }
+
+    public @Nullable String specPath() {
+      return specPath;
+    }
+
+    public @Nullable Sdk sdk() {
+      return sdk;
+    }
+
+    public @Nullable String executable() {
+      return executable;
+    }
+
+    public @Nullable Pipeline pipeline() {
+      return pipeline;
+    }
+
+    public static Builder builderWithName(String jobName, String specPath) {
+      return new Builder(jobName, specPath);
+    }
+
+    public static Builder builder(String testName, @Nullable String specPath) {
+      return new Builder(createJobName(testName), specPath);
+    }
+
+    public static Builder builder(String jobName) {
+      return builder(jobName, null);
+    }
+
+    /** Builder for the {@link LaunchConfig}. */
+    public static final class Builder {
+      private final String jobName;
+      private final @Nullable String specPath;
+      private final Map<String, Object> environment;
+      private Map<String, String> parameters;
+      private @Nullable Sdk sdk;
+      private @Nullable String executable;
+      private @Nullable Pipeline pipeline;
+
+      private Builder(String jobName, @Nullable String specPath) {
+        this.jobName = jobName;
+        this.parameters = new HashMap<>();
+        this.environment = new HashMap<>();
+        this.specPath = specPath;
+      }
+
+      public String getJobName() {
+        return jobName;
+      }
+
+      @Nullable

Review Comment:
   Use checkerframework `@Nullable` annotation and annotate the return type, not the overall method.
   
   Do this throughout the file, and all the new code.



##########
it/common/src/main/java/org/apache/beam/it/common/matchers/LaunchInfoSubject.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common.matchers;
+
+import com.google.common.truth.FailureMetadata;
+import com.google.common.truth.Subject;
+import org.apache.beam.it.common.PipelineLauncher.JobState;
+import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
+
+/**
+ * Subject that has assertion operations for {@link LaunchInfo}, which has the information for a
+ * recently launched pipeline.
+ */
+public final class LaunchInfoSubject extends Subject {
+
+  private final LaunchInfo actual;
+
+  private LaunchInfoSubject(FailureMetadata metadata, LaunchInfo actual) {
+    super(metadata, actual);

Review Comment:
   Using `super` constructor just to initialize fields is an anti-pattern. It seems to be built into Truth, which is a bummer. Is this the intended use of `Subject`? Is there a way to use composition instead of inheritance to more cleanly separate this class?



##########
it/common/src/main/java/org/apache/beam/it/common/conditions/package-info.java:
##########
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Package that contains reusable conditions. */
+package org.apache.beam.it.common.conditions;

Review Comment:
   Don't make subpackages of `common`. Common is already "misc" so you can just put `conditions` at the top level.



##########
it/common/src/main/java/org/apache/beam/it/common/logging/LogStrings.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common.logging;

Review Comment:
   Same. Just `org.apache.beam.it.logging`



##########
it/mongodb/src/test/java/org/apache/beam/it/mongodb/DefaultMongoDBResourceManagerTest.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.mongodb;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import java.io.IOException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.bson.Document;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.testcontainers.containers.MongoDBContainer;
+
+/** Unit tests for {@link DefaultMongoDBResourceManager}. */
+@RunWith(JUnit4.class)
+public class DefaultMongoDBResourceManagerTest {
+
+  @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private MongoIterable<String> collectionIterable;
+
+  @Mock private MongoClient mongoClient;
+  @Mock private MongoDatabase database;
+  @Mock private MongoCollection<Document> collection;
+  @Mock private MongoCursor<String> collectionNames;
+  @Mock private MongoDBContainer container;
+
+  private static final String TEST_ID = "test-id";
+  private static final String COLLECTION_NAME = "collection-name";
+  private static final String STATIC_DATABASE_NAME = "database";
+  private static final String HOST = "localhost";
+  private static final int MONGO_DB_PORT = 27017;
+  private static final int MAPPED_PORT = 10000;
+
+  private DefaultMongoDBResourceManager testManager;
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    when(container.getHost()).thenReturn(HOST);
+    when(container.getMappedPort(MONGO_DB_PORT)).thenReturn(MAPPED_PORT);
+
+    testManager =
+        new DefaultMongoDBResourceManager(
+            mongoClient, container, DefaultMongoDBResourceManager.builder(TEST_ID));
+  }
+
+  @Test
+  public void testCreateResourceManagerBuilderReturnsDefaultMongoDBResourceManager()
+      throws IOException {
+    assertThat(
+            DefaultMongoDBResourceManager.builder(TEST_ID)
+                .useStaticContainer()
+                .setHost(HOST)
+                .setPort(MONGO_DB_PORT)
+                .build())
+        .isInstanceOf(DefaultMongoDBResourceManager.class);
+  }
+
+  @Test
+  public void testGetUriShouldReturnCorrectValue() {
+    assertThat(testManager.getUri()).matches("mongodb://" + HOST + ":" + MAPPED_PORT);
+  }
+
+  @Test
+  public void testGetDatabaseNameShouldReturnCorrectValue() {
+    assertThat(testManager.getDatabaseName()).matches(TEST_ID + "-\\d{8}-\\d{6}-\\d{6}");

Review Comment:
   All the setup for the test is outside of the test case. You should inline it so that what is being tested is clear. Currently it basically just tests the setup.



##########
it/common/src/main/java/org/apache/beam/it/common/matchers/ListAccumulator.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common.matchers;

Review Comment:
   just `org.apache.beam.it.matchers`



##########
it/mongodb/src/main/java/org/apache/beam/it/mongodb/DefaultMongoDBResourceManager.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.mongodb;
+
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.testcontainers.TestContainerResourceManager;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Default class for implementation of {@link MongoDBResourceManager} interface.
+ *
+ * <p>The class supports one database and multiple collections per database object. A database is
+ * created when the first collection is created if one has not been created already.
+ *
+ * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time,
+ * microsecond precision}", with additional formatting.
+ *
+ * <p>The class is thread-safe.
+ */
+public class DefaultMongoDBResourceManager extends TestContainerResourceManager<MongoDBContainer>
+    implements MongoDBResourceManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultMongoDBResourceManager.class);
+
+  private static final String DEFAULT_MONGODB_CONTAINER_NAME = "mongo";
+
+  // A list of available MongoDB Docker image tags can be found at
+  // https://hub.docker.com/_/mongo/tags
+  private static final String DEFAULT_MONGODB_CONTAINER_TAG = "4.0.18";
+
+  // 27017 is the default port that MongoDB is configured to listen on
+  private static final int MONGODB_INTERNAL_PORT = 27017;
+
+  private final MongoClient mongoClient;
+  private final String databaseName;
+  private final String connectionString;
+  private final boolean usingStaticDatabase;
+
+  private DefaultMongoDBResourceManager(DefaultMongoDBResourceManager.Builder builder) {
+    this(
+        /* mongoClient= */ null,
+        new MongoDBContainer(
+            DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
+        builder);
+  }
+
+  @SuppressWarnings("method.invocation")
+  @VisibleForTesting
+  DefaultMongoDBResourceManager(
+      @Nullable MongoClient mongoClient,
+      MongoDBContainer container,
+      DefaultMongoDBResourceManager.Builder builder) {
+    super(container, builder);
+
+    this.usingStaticDatabase = builder.databaseName != null;
+    this.databaseName =
+        usingStaticDatabase
+            ? builder.databaseName
+            : MongoDBResourceManagerUtils.generateDatabaseName(builder.testId);
+    this.connectionString =
+        String.format("mongodb://%s:%d", this.getHost(), this.getPort(MONGODB_INTERNAL_PORT));
+    this.mongoClient = mongoClient == null ? MongoClients.create(connectionString) : mongoClient;
+  }
+
+  public static DefaultMongoDBResourceManager.Builder builder(String testId) throws IOException {
+    return new DefaultMongoDBResourceManager.Builder(testId);
+  }
+
+  @Override
+  public synchronized String getUri() {
+    return connectionString;
+  }
+
+  @Override
+  public synchronized String getDatabaseName() {
+    return databaseName;
+  }
+
+  private synchronized MongoDatabase getDatabase() {
+    try {
+      return mongoClient.getDatabase(databaseName);
+    } catch (Exception e) {
+      throw new MongoDBResourceManagerException(
+          "Error retrieving database " + databaseName + " from MongoDB.", e);
+    }
+  }
+
+  private synchronized boolean collectionExists(String collectionName) {
+    // Check collection name
+    MongoDBResourceManagerUtils.checkValidCollectionName(databaseName, collectionName);
+
+    Iterable<String> collectionNames = getDatabase().listCollectionNames();
+    for (String name : collectionNames) {
+      // The Collection already exists in the database, return false.
+      if (collectionName.equals(name)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public synchronized boolean createCollection(String collectionName) {
+    LOG.info("Creating collection using collectionName '{}'.", collectionName);
+
+    try {
+      // Check to see if the Collection exists
+      if (collectionExists(collectionName)) {
+        return false;
+      }
+      // The Collection does not exist in the database, create it and return true.
+      getDatabase().getCollection(collectionName);
+    } catch (Exception e) {
+      throw new MongoDBResourceManagerException("Error creating collection.", e);
+    }
+
+    LOG.info("Successfully created collection {}.{}", databaseName, collectionName);
+
+    return true;
+  }
+
+  /**
+   * Helper method to retrieve a MongoCollection with the given name from the database and return
+   * it.
+   *
+   * @param collectionName The name of the MongoCollection.
+   * @param createCollection A boolean that specifies to create the Collection if it does not exist.
+   * @return A MongoCollection with the given name.
+   */
+  private MongoCollection<Document> getMongoDBCollection(
+      String collectionName, boolean createCollection) {
+    if (!collectionExists(collectionName) && !createCollection) {
+      throw new MongoDBResourceManagerException(
+          "Collection " + collectionName + " does not exists in database " + databaseName);
+    }
+
+    return getDatabase().getCollection(collectionName);
+  }
+
+  @Override
+  public synchronized boolean insertDocument(String collectionName, Document document) {
+    return insertDocuments(collectionName, ImmutableList.of(document));
+  }
+
+  @Override
+  public synchronized boolean insertDocuments(String collectionName, List<Document> documents) {
+    LOG.info(
+        "Attempting to write {} documents to {}.{}.",
+        documents.size(),
+        databaseName,
+        collectionName);
+
+    try {
+      getMongoDBCollection(collectionName, /* createCollection= */ true).insertMany(documents);
+    } catch (Exception e) {
+      throw new MongoDBResourceManagerException("Error inserting documents.", e);
+    }
+
+    LOG.info(
+        "Successfully wrote {} documents to {}.{}", documents.size(), databaseName, collectionName);
+
+    return true;
+  }
+
+  @Override
+  public synchronized FindIterable<Document> readCollection(String collectionName) {
+    LOG.info("Reading all documents from {}.{}", databaseName, collectionName);
+
+    FindIterable<Document> documents;
+    try {
+      documents = getMongoDBCollection(collectionName, /* createCollection= */ false).find();
+    } catch (Exception e) {
+      throw new MongoDBResourceManagerException("Error reading collection.", e);
+    }
+
+    LOG.info("Successfully loaded documents from {}.{}", databaseName, collectionName);
+
+    return documents;
+  }
+
+  @Override
+  public synchronized void cleanupAll() {
+    LOG.info("Attempting to cleanup MongoDB manager.");
+
+    boolean producedError = false;
+
+    // First, delete the database if it was not given as a static argument
+    try {
+      if (!usingStaticDatabase) {
+        mongoClient.getDatabase(databaseName).drop();
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to delete MongoDB database {}.", databaseName, e);
+      producedError = true;
+    }
+
+    // Next, try to close the MongoDB client connection
+    try {
+      mongoClient.close();
+    } catch (Exception e) {
+      LOG.error("Failed to delete MongoDB client.", e);
+      producedError = true;
+    }
+
+    // Throw Exception at the end if there were any errors
+    if (producedError) {
+      throw new MongoDBResourceManagerException(
+          "Failed to delete resources. Check above for errors.");
+    }
+
+    super.cleanupAll();
+
+    LOG.info("MongoDB manager successfully cleaned up.");
+  }
+
+  /** Builder for {@link DefaultMongoDBResourceManager}. */
+  public static final class Builder
+      extends TestContainerResourceManager.Builder<DefaultMongoDBResourceManager> {
+
+    @SuppressWarnings("initialization.fields.uninitialized")

Review Comment:
   Why? Based on the API here, you should make this nullable, and check for null when you use it.



##########
it/mongodb/build.gradle:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.it.mongodb',
+)
+
+description = "Apache Beam :: IT :: MongoDB"
+ext.summary = "Integration test utilities for MongoDB."
+
+dependencies {
+    implementation project(':it:common')
+    implementation library.java.testcontainers_base
+    implementation library.java.testcontainers_mongodb
+    implementation library.java.vendored_guava_26_0_jre
+    implementation library.java.slf4j_api
+    //implementation 'org.mongodb:mongodb-driver-sync:3.12.10'
+    implementation library.java.mongo_java_driver
+    implementation 'com.google.re2j:re2j:1.6'

Review Comment:
   same comment here - why use the Google library? should at least have a good reason and comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26444: Integration/Load test framework

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1524045779

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @damccorm for label build.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #26444: Integration/Load test framework

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1555134786

   OK let me know when I should make another pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1551596146

   I'm working on addressing your comments and making the required changes to https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/it to maintain one source of truth.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1258921387


##########
it/mongodb/src/main/java/org/apache/beam/it/mongodb/DefaultMongoDBResourceManager.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.mongodb;
+
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.it.common.testcontainers.TestContainerResourceManager;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Default class for implementation of {@link MongoDBResourceManager} interface.
+ *
+ * <p>The class supports one database and multiple collections per database object. A database is
+ * created when the first collection is created if one has not been created already.
+ *
+ * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time,
+ * microsecond precision}", with additional formatting.
+ *
+ * <p>The class is thread-safe.
+ */
+public class DefaultMongoDBResourceManager extends TestContainerResourceManager<MongoDBContainer>
+    implements MongoDBResourceManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultMongoDBResourceManager.class);
+
+  private static final String DEFAULT_MONGODB_CONTAINER_NAME = "mongo";
+
+  // A list of available MongoDB Docker image tags can be found at
+  // https://hub.docker.com/_/mongo/tags
+  private static final String DEFAULT_MONGODB_CONTAINER_TAG = "4.0.18";
+
+  // 27017 is the default port that MongoDB is configured to listen on
+  private static final int MONGODB_INTERNAL_PORT = 27017;
+
+  private final MongoClient mongoClient;
+  private final String databaseName;
+  private final String connectionString;
+  private final boolean usingStaticDatabase;
+
+  private DefaultMongoDBResourceManager(DefaultMongoDBResourceManager.Builder builder) {
+    this(
+        /* mongoClient= */ null,
+        new MongoDBContainer(
+            DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
+        builder);
+  }
+
+  @SuppressWarnings("method.invocation")
+  @VisibleForTesting
+  DefaultMongoDBResourceManager(
+      @Nullable MongoClient mongoClient,
+      MongoDBContainer container,
+      DefaultMongoDBResourceManager.Builder builder) {
+    super(container, builder);
+
+    this.usingStaticDatabase = builder.databaseName != null;
+    this.databaseName =
+        usingStaticDatabase
+            ? builder.databaseName
+            : MongoDBResourceManagerUtils.generateDatabaseName(builder.testId);
+    this.connectionString =
+        String.format("mongodb://%s:%d", this.getHost(), this.getPort(MONGODB_INTERNAL_PORT));
+    this.mongoClient = mongoClient == null ? MongoClients.create(connectionString) : mongoClient;
+  }
+
+  public static DefaultMongoDBResourceManager.Builder builder(String testId) throws IOException {
+    return new DefaultMongoDBResourceManager.Builder(testId);
+  }
+
+  @Override
+  public synchronized String getUri() {
+    return connectionString;
+  }
+
+  @Override
+  public synchronized String getDatabaseName() {
+    return databaseName;
+  }
+
+  private synchronized MongoDatabase getDatabase() {
+    try {
+      return mongoClient.getDatabase(databaseName);
+    } catch (Exception e) {
+      throw new MongoDBResourceManagerException(
+          "Error retrieving database " + databaseName + " from MongoDB.", e);
+    }
+  }
+
+  private synchronized boolean collectionExists(String collectionName) {
+    // Check collection name
+    MongoDBResourceManagerUtils.checkValidCollectionName(databaseName, collectionName);
+
+    Iterable<String> collectionNames = getDatabase().listCollectionNames();
+    for (String name : collectionNames) {
+      // The Collection already exists in the database, return false.
+      if (collectionName.equals(name)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public synchronized boolean createCollection(String collectionName) {
+    LOG.info("Creating collection using collectionName '{}'.", collectionName);
+
+    try {
+      // Check to see if the Collection exists
+      if (collectionExists(collectionName)) {
+        return false;
+      }
+      // The Collection does not exist in the database, create it and return true.
+      getDatabase().getCollection(collectionName);
+    } catch (Exception e) {
+      throw new MongoDBResourceManagerException("Error creating collection.", e);
+    }
+
+    LOG.info("Successfully created collection {}.{}", databaseName, collectionName);
+
+    return true;
+  }
+
+  /**
+   * Helper method to retrieve a MongoCollection with the given name from the database and return
+   * it.
+   *
+   * @param collectionName The name of the MongoCollection.
+   * @param createCollection A boolean that specifies to create the Collection if it does not exist.
+   * @return A MongoCollection with the given name.
+   */
+  private MongoCollection<Document> getMongoDBCollection(
+      String collectionName, boolean createCollection) {
+    if (!collectionExists(collectionName) && !createCollection) {
+      throw new MongoDBResourceManagerException(
+          "Collection " + collectionName + " does not exists in database " + databaseName);
+    }
+
+    return getDatabase().getCollection(collectionName);
+  }
+
+  @Override
+  public synchronized boolean insertDocument(String collectionName, Document document) {
+    return insertDocuments(collectionName, ImmutableList.of(document));
+  }
+
+  @Override
+  public synchronized boolean insertDocuments(String collectionName, List<Document> documents) {
+    LOG.info(
+        "Attempting to write {} documents to {}.{}.",
+        documents.size(),
+        databaseName,
+        collectionName);
+
+    try {
+      getMongoDBCollection(collectionName, /* createCollection= */ true).insertMany(documents);
+    } catch (Exception e) {
+      throw new MongoDBResourceManagerException("Error inserting documents.", e);
+    }
+
+    LOG.info(
+        "Successfully wrote {} documents to {}.{}", documents.size(), databaseName, collectionName);
+
+    return true;
+  }
+
+  @Override
+  public synchronized FindIterable<Document> readCollection(String collectionName) {
+    LOG.info("Reading all documents from {}.{}", databaseName, collectionName);
+
+    FindIterable<Document> documents;
+    try {
+      documents = getMongoDBCollection(collectionName, /* createCollection= */ false).find();
+    } catch (Exception e) {
+      throw new MongoDBResourceManagerException("Error reading collection.", e);
+    }
+
+    LOG.info("Successfully loaded documents from {}.{}", databaseName, collectionName);
+
+    return documents;
+  }
+
+  @Override
+  public synchronized void cleanupAll() {
+    LOG.info("Attempting to cleanup MongoDB manager.");
+
+    boolean producedError = false;
+
+    // First, delete the database if it was not given as a static argument
+    try {
+      if (!usingStaticDatabase) {
+        mongoClient.getDatabase(databaseName).drop();
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to delete MongoDB database {}.", databaseName, e);
+      producedError = true;
+    }
+
+    // Next, try to close the MongoDB client connection
+    try {
+      mongoClient.close();
+    } catch (Exception e) {
+      LOG.error("Failed to delete MongoDB client.", e);
+      producedError = true;
+    }
+
+    // Throw Exception at the end if there were any errors
+    if (producedError) {
+      throw new MongoDBResourceManagerException(
+          "Failed to delete resources. Check above for errors.");
+    }
+
+    super.cleanupAll();
+
+    LOG.info("MongoDB manager successfully cleaned up.");
+  }
+
+  /** Builder for {@link DefaultMongoDBResourceManager}. */
+  public static final class Builder
+      extends TestContainerResourceManager.Builder<DefaultMongoDBResourceManager> {
+
+    @SuppressWarnings("initialization.fields.uninitialized")

Review Comment:
   Will address after code drop. Opened https://github.com/apache/beam/issues/27438



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1258919946


##########
it/cassandra/src/main/java/org/apache/beam/it/cassandra/DefaultCassandraResourceManager.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.cassandra;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.DriverTimeoutException;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;

Review Comment:
   I have kept the java annotations for now since we don't need the checker annotations yet. We can decide on this after the code drop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1258921048


##########
it/common/src/main/java/org/apache/beam/it/common/package-info.java:
##########
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Package for managing common ResourceManager resources within integration tests. */
+package org.apache.beam.it.common;

Review Comment:
   Addressed in above comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1196707269


##########
it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.it.common.utils.PipelineUtils.createJobName;
+
+import com.google.api.services.dataflow.model.Job;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/** Client for working with Cloud Dataflow. */
+public interface PipelineLauncher {
+  /** Enum representing Apache Beam SDKs. */
+  enum Sdk {
+    JAVA("JAVA"),
+    PYTHON("PYTHON"),
+    GO("GO");
+
+    private final String text;
+
+    Sdk(String text) {
+      this.text = text;
+    }
+
+    @Override
+    public String toString() {
+      return text;
+    }
+  }
+
+  /** Enum representing known Dataflow job states. */
+  enum JobState {

Review Comment:
   Not all of them. These represent the `JobState` values for dataflow. States like `DRAINING` are not in the SDK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1196734178


##########
it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.common;
+
+import static org.apache.beam.it.common.utils.PipelineUtils.createJobName;
+
+import com.google.api.services.dataflow.model.Job;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/** Client for working with Cloud Dataflow. */

Review Comment:
   Just Dataflow. It also supports DirectRunner 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26444: Integration/Load test framework

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1534667249

   Reminder, please take a look at this pr: @kennknowles @damccorm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pranavbhandari24 commented on a diff in pull request #26444: Integration/Load test framework

Posted by "pranavbhandari24 (via GitHub)" <gi...@apache.org>.
pranavbhandari24 commented on code in PR #26444:
URL: https://github.com/apache/beam/pull/26444#discussion_r1258921954


##########
it/mongodb/src/test/java/org/apache/beam/it/mongodb/DefaultMongoDBResourceManagerTest.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.mongodb;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import java.io.IOException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.bson.Document;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.testcontainers.containers.MongoDBContainer;
+
+/** Unit tests for {@link DefaultMongoDBResourceManager}. */
+@RunWith(JUnit4.class)
+public class DefaultMongoDBResourceManagerTest {
+
+  @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private MongoIterable<String> collectionIterable;
+
+  @Mock private MongoClient mongoClient;
+  @Mock private MongoDatabase database;
+  @Mock private MongoCollection<Document> collection;
+  @Mock private MongoCursor<String> collectionNames;
+  @Mock private MongoDBContainer container;
+
+  private static final String TEST_ID = "test-id";
+  private static final String COLLECTION_NAME = "collection-name";
+  private static final String STATIC_DATABASE_NAME = "database";
+  private static final String HOST = "localhost";
+  private static final int MONGO_DB_PORT = 27017;
+  private static final int MAPPED_PORT = 10000;
+
+  private DefaultMongoDBResourceManager testManager;
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    when(container.getHost()).thenReturn(HOST);
+    when(container.getMappedPort(MONGO_DB_PORT)).thenReturn(MAPPED_PORT);
+
+    testManager =
+        new DefaultMongoDBResourceManager(
+            mongoClient, container, DefaultMongoDBResourceManager.builder(TEST_ID));
+  }
+
+  @Test
+  public void testCreateResourceManagerBuilderReturnsDefaultMongoDBResourceManager()
+      throws IOException {
+    assertThat(
+            DefaultMongoDBResourceManager.builder(TEST_ID)
+                .useStaticContainer()
+                .setHost(HOST)
+                .setPort(MONGO_DB_PORT)
+                .build())
+        .isInstanceOf(DefaultMongoDBResourceManager.class);
+  }
+
+  @Test
+  public void testGetUriShouldReturnCorrectValue() {
+    assertThat(testManager.getUri()).matches("mongodb://" + HOST + ":" + MAPPED_PORT);
+  }
+
+  @Test
+  public void testGetDatabaseNameShouldReturnCorrectValue() {
+    assertThat(testManager.getDatabaseName()).matches(TEST_ID + "-\\d{8}-\\d{6}-\\d{6}");

Review Comment:
   Refactored the test class to make it more clear. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #26444: Integration/Load test framework

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #26444:
URL: https://github.com/apache/beam/pull/26444#issuecomment-1632594629

   BTW the reason I am suggesting the split: this code clearly needs review & revision to match the expectations of the Beam repo. That will be much easier to do in pieces.
   
    - We can merge the overall framework with TODOs in place / P1s filed to fix the disabled checks
    - Then we could do thorough reviews one at a time of the test modules and get them fixed up before merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org