You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "mdedetrich (via GitHub)" <gi...@apache.org> on 2023/05/02 22:31:31 UTC

[GitHub] [incubator-pekko-connectors] mdedetrich opened a new pull request, #96: Integrate AWSS3IntegrationSpec with S3 Account

mdedetrich opened a new pull request, #96:
URL: https://github.com/apache/incubator-pekko-connectors/pull/96

   TBD


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on a diff in pull request #96: Integrate AWSS3IntegrationSpec with S3 Account

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #96:
URL: https://github.com/apache/incubator-pekko-connectors/pull/96#discussion_r1183329478


##########
s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/Generators.scala:
##########


Review Comment:
   These generators are directly copied from an OS company project that I work on (see https://github.com/aiven/guardian-for-apache-kafka/blob/main/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala) and hence I can confirm that they work correctly since they have been use for over a year.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich merged pull request #96: Integrate AWSS3IntegrationSpec with S3 Account

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich merged PR #96:
URL: https://github.com/apache/incubator-pekko-connectors/pull/96


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on pull request #96: Integrate AWSS3IntegrationSpec with S3 Account

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #96:
URL: https://github.com/apache/incubator-pekko-connectors/pull/96#issuecomment-1537109810

   @pjfanning @nvollmar Would it be possible to look into this? I want to merge outstanding PR's before I start looking into https://github.com/apache/incubator-pekko/pull/281 since merging https://github.com/apache/incubator-pekko/pull/281 will essentially block all downstream modules.
   
   Also Apache INFRA is waiting for a response on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on a diff in pull request #96: Integrate AWSS3IntegrationSpec with S3 Account

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #96:
URL: https://github.com/apache/incubator-pekko-connectors/pull/96#discussion_r1183329478


##########
s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/Generators.scala:
##########


Review Comment:
   These generators are directly copied from an OS company project (see https://github.com/aiven/guardian-for-apache-kafka/blob/main/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala) and hence I can confirm that they work correctly since they have been use for over a year.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on a diff in pull request #96: Integrate AWSS3IntegrationSpec with S3 Account

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #96:
URL: https://github.com/apache/incubator-pekko-connectors/pull/96#discussion_r1183347220


##########
s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala:
##########
@@ -309,144 +460,161 @@ trait S3IntegrationSpec
       .futureValue shouldEqual pekko.Done
   }
 
-  it should "upload, download and delete with spaces in the key in non us-east-1 zone" in uploadDownloadAndDeleteInOtherRegionCase(
-    "test folder/test file.txt")
+  it should "upload, download and delete with spaces in the key in non us-east-1 zone" in withBucketWithDots {
+    bucketWithDots =>
+      uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots,
+        "test folder/test file.txt")
+  }
 
   // we want ASCII and other UTF-8 characters!
-  it should "upload, download and delete with special characters in the key in non us-east-1 zone" in uploadDownloadAndDeleteInOtherRegionCase(
-    "føldęrü/1234()[]><!? .TXT")
+  it should "upload, download and delete with special characters in the key in non us-east-1 zone" in withBucketWithDots {
+    bucketWithDots =>
+      uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots,
+        "føldęrü/1234()[]><!? .TXT")
+  }
 
-  it should "upload, download and delete with `+` character in the key in non us-east-1 zone" in uploadDownloadAndDeleteInOtherRegionCase(
-    "1 + 2 = 3")
+  it should "upload, download and delete with `+` character in the key in non us-east-1 zone" in withBucketWithDots {
+    bucketWithDots =>
+      uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots,
+        "1 + 2 = 3")
+  }
 
-  it should "upload, copy, download the copy, and delete" in uploadCopyDownload(
-    "original/file.txt",
-    "copy/file.txt")
+  it should "upload, copy, download the copy, and delete" in withDefaultBucket { defaultBucket =>
+    uploadCopyDownload(defaultBucket,
+      "original/file.txt",
+      "copy/file.txt")
+  }
 
   // NOTE: MinIO currently has problems copying files with spaces.
-  it should "upload, copy, download the copy, and delete with special characters in key" in uploadCopyDownload(
-    "original/føldęrü/1234()[]><!?.TXT",
-    "copy/1 + 2 = 3")
-
-  it should "upload 2 files with common prefix, 1 with different prefix and delete by prefix" in {
-    val sourceKey1 = "original/file1.txt"
-    val sourceKey2 = "original/file2.txt"
-    val sourceKey3 = "uploaded/file3.txt"
-    val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: Nil)
-
-    val results = for {
-      upload1 <- source.runWith(S3.multipartUpload(defaultBucket, sourceKey1).withAttributes(attributes))
-      upload2 <- source.runWith(S3.multipartUpload(defaultBucket, sourceKey2).withAttributes(attributes))
-      upload3 <- source.runWith(S3.multipartUpload(defaultBucket, sourceKey3).withAttributes(attributes))
-    } yield (upload1, upload2, upload3)
-
-    whenReady(results) {
-      case (upload1, upload2, upload3) =>
-        upload1.bucket shouldEqual defaultBucket
-        upload1.key shouldEqual sourceKey1
-        upload2.bucket shouldEqual defaultBucket
-        upload2.key shouldEqual sourceKey2
-        upload3.bucket shouldEqual defaultBucket
-        upload3.key shouldEqual sourceKey3
+  it should "upload, copy, download the copy, and delete with special characters in key" in withDefaultBucket {
+    defaultBucket =>
+      uploadCopyDownload(defaultBucket,
+        "original/føldęrü/1234()[]><!?.TXT",
+        "copy/1 + 2 = 3")
+  }
 
-        S3.deleteObjectsByPrefix(defaultBucket, Some("original"))
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-          .futureValue shouldEqual pekko.Done
-        val numOfKeysForPrefix =
-          S3.listBucket(defaultBucket, Some("original"))
+  it should "upload 2 files with common prefix, 1 with different prefix and delete by prefix" in withDefaultBucket {
+    defaultBucket =>
+      val sourceKey1 = "original/file1.txt"
+      val sourceKey2 = "original/file2.txt"
+      val sourceKey3 = "uploaded/file3.txt"
+      val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: Nil)
+
+      val results = for {
+        upload1 <- source.runWith(S3.multipartUpload(defaultBucket, sourceKey1).withAttributes(attributes))
+        upload2 <- source.runWith(S3.multipartUpload(defaultBucket, sourceKey2).withAttributes(attributes))
+        upload3 <- source.runWith(S3.multipartUpload(defaultBucket, sourceKey3).withAttributes(attributes))
+      } yield (upload1, upload2, upload3)
+
+      whenReady(results) {
+        case (upload1, upload2, upload3) =>
+          upload1.bucket shouldEqual defaultBucket
+          upload1.key shouldEqual sourceKey1
+          upload2.bucket shouldEqual defaultBucket
+          upload2.key shouldEqual sourceKey2
+          upload3.bucket shouldEqual defaultBucket
+          upload3.key shouldEqual sourceKey3
+
+          S3.deleteObjectsByPrefix(defaultBucket, Some("original"))
             .withAttributes(attributes)
-            .runFold(0)((result, _) => result + 1)
-            .futureValue
-        numOfKeysForPrefix shouldEqual 0
-        S3.deleteObject(defaultBucket, sourceKey3)
-          .withAttributes(attributes)
-          .runWith(Sink.head)
-          .futureValue shouldEqual pekko.Done
-    }
+            .runWith(Sink.ignore)
+            .futureValue shouldEqual pekko.Done
+          val numOfKeysForPrefix =
+            S3.listBucket(defaultBucket, Some("original"))
+              .withAttributes(attributes)
+              .runFold(0)((result, _) => result + 1)
+              .futureValue
+          numOfKeysForPrefix shouldEqual 0
+          S3.deleteObject(defaultBucket, sourceKey3)
+            .withAttributes(attributes)
+            .runWith(Sink.head)
+            .futureValue shouldEqual pekko.Done
+      }
   }
 
-  it should "create multiple versions of an object and successfully clean it with deleteBucketContents" in {
-    // TODO: Figure out a way to properly test this with Minio, see https://github.com/akka/alpakka/issues/2750
-    assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    val versionKey = "test-version"
-    val one = ByteString("one")
-    val two = ByteString("two")
-    val three = ByteString("three")
-
-    val results =
-      for {
-        // Clean the bucket just incase there is residual data in there
-        _ <- S3
-          .deleteBucketContents(bucketWithVersioning, deleteAllVersions = true)
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        _ <- S3
-          .putObject(bucketWithVersioning, versionKey, Source.single(one), one.length, s3Headers = S3Headers())
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        _ <- S3
-          .putObject(bucketWithVersioning, versionKey, Source.single(two), two.length, s3Headers = S3Headers())
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        _ <- S3
-          .putObject(bucketWithVersioning, versionKey, Source.single(three), three.length, s3Headers = S3Headers())
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        versionsBeforeDelete <- S3
-          .listObjectVersions(bucketWithVersioning, None)
-          .withAttributes(attributes)
-          .runWith(Sink.seq)
-        _ <- S3
-          .deleteBucketContents(bucketWithVersioning, deleteAllVersions = true)
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        versionsAfterDelete <- S3
-          .listObjectVersions(bucketWithVersioning, None)
-          .withAttributes(attributes)
-          .runWith(Sink.seq)
-        listBucketContentsAfterDelete <- S3
-          .listBucket(bucketWithVersioning, None)
-          .withAttributes(attributes)
-          .runWith(Sink.seq)
+  it should "create multiple versions of an object and successfully clean it with deleteBucketContents" in withBucketWithVersioning {
+    bucketWithVersioning =>
+      // TODO: Figure out a way to properly test this with Minio, see https://github.com/akka/alpakka/issues/2750
+      assume(this.isInstanceOf[AWSS3IntegrationSpec])
+      val versionKey = "test-version"
+      val one = ByteString("one")
+      val two = ByteString("two")
+      val three = ByteString("three")
+
+      val results =
+        for {
+          // Clean the bucket just incase there is residual data in there
+          _ <- S3
+            .deleteBucketContents(bucketWithVersioning, deleteAllVersions = true)
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          _ <- S3
+            .putObject(bucketWithVersioning, versionKey, Source.single(one), one.length, s3Headers = S3Headers())
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          _ <- S3
+            .putObject(bucketWithVersioning, versionKey, Source.single(two), two.length, s3Headers = S3Headers())
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          _ <- S3
+            .putObject(bucketWithVersioning, versionKey, Source.single(three), three.length, s3Headers = S3Headers())
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          versionsBeforeDelete <- S3
+            .listObjectVersions(bucketWithVersioning, None)
+            .withAttributes(attributes)
+            .runWith(Sink.seq)
+          _ <- S3
+            .deleteBucketContents(bucketWithVersioning, deleteAllVersions = true)
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          versionsAfterDelete <- S3
+            .listObjectVersions(bucketWithVersioning, None)
+            .withAttributes(attributes)
+            .runWith(Sink.seq)
+          listBucketContentsAfterDelete <- S3
+            .listBucket(bucketWithVersioning, None)
+            .withAttributes(attributes)
+            .runWith(Sink.seq)
 
-      } yield (versionsBeforeDelete.flatMap { case (versions, _) => versions },
-        versionsAfterDelete.flatMap {
-          case (versions, _) => versions
-        }, listBucketContentsAfterDelete)
+        } yield (versionsBeforeDelete.flatMap { case (versions, _) => versions },
+          versionsAfterDelete.flatMap {
+            case (versions, _) => versions
+          }, listBucketContentsAfterDelete)
 
-    val (versionsBeforeDelete, versionsAfterDelete, bucketContentsAfterDelete) = results.futureValue
+      val (versionsBeforeDelete, versionsAfterDelete, bucketContentsAfterDelete) = results.futureValue
 
-    versionsBeforeDelete.size shouldEqual 3
-    versionsAfterDelete.size shouldEqual 0
-    bucketContentsAfterDelete.size shouldEqual 0
+      versionsBeforeDelete.size shouldEqual 3
+      versionsAfterDelete.size shouldEqual 0
+      bucketContentsAfterDelete.size shouldEqual 0
   }
 
-  it should "listing object versions for a non versioned bucket should return None for versionId" in {
-    // TODO: Figure out a way to properly test this with Minio, see https://github.com/akka/alpakka/issues/2750
-    assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    val objectKey = "listObjectVersionIdTest"
-    val bytes = ByteString(objectValue)
-    val data = Source.single(ByteString(objectValue))
-    val results = for {
-      _ <- S3
-        .putObject(defaultBucket,
-          objectKey,
-          data,
-          bytes.length,
-          s3Headers = S3Headers().withMetaHeaders(MetaHeaders(metaHeaders)))
-        .withAttributes(attributes)
-        .runWith(Sink.ignore)
-      result <- S3.listObjectVersions(defaultBucket, None).withAttributes(attributes).runWith(Sink.seq)
-      _ <- S3.deleteObject(defaultBucket, objectKey).withAttributes(attributes).runWith(Sink.ignore)
-    } yield result.flatMap { case (versions, _) => versions }
+  it should "listing object versions for a non versioned bucket should return None for versionId" in withDefaultBucket {
+    defaultBucket =>
+      // TODO: Figure out a way to properly test this with Minio, see https://github.com/akka/alpakka/issues/2750
+      assume(this.isInstanceOf[AWSS3IntegrationSpec])
+      val objectKey = "listObjectVersionIdTest"
+      val bytes = ByteString(objectValue)
+      val data = Source.single(ByteString(objectValue))
+      val results = for {
+        _ <- S3
+          .putObject(defaultBucket,
+            objectKey,
+            data,
+            bytes.length,
+            s3Headers = S3Headers().withMetaHeaders(MetaHeaders(metaHeaders)))
+          .withAttributes(attributes)
+          .runWith(Sink.ignore)
+        result <- S3.listObjectVersions(defaultBucket, None).withAttributes(attributes).runWith(Sink.seq)
+        _ <- S3.deleteObject(defaultBucket, objectKey).withAttributes(attributes).runWith(Sink.ignore)
+      } yield result.flatMap { case (versions, _) => versions }
 
-    forEvery(results.futureValue) { version =>
-      version.versionId shouldEqual None
-    }
+      Inspectors.forEvery(results.futureValue) { version =>

Review Comment:
   This is because the `forEvery` from Scalatest conflicts with the `forEvery` from scalacheck (which is being introduced in this PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on a diff in pull request #96: Integrate AWSS3IntegrationSpec with S3 Account

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #96:
URL: https://github.com/apache/incubator-pekko-connectors/pull/96#discussion_r1183340278


##########
s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala:
##########
@@ -48,30 +50,180 @@ trait S3IntegrationSpec
     with BeforeAndAfterAll
     with Matchers
     with ScalaFutures
+    with ScalaCheckPropertyChecks
     with OptionValues
     with LogCapturing {
 
   implicit val ec: ExecutionContext = system.dispatcher
 
   implicit val defaultPatience: PatienceConfig = PatienceConfig(3.minutes, 100.millis)
 
-  val defaultBucket = "my-test-us-east-1"
-  val nonExistingBucket = "nowhere"
+  /**
+   * A prefix that will get added to each generated bucket in the test, this is to track the buckets that are
+   * specifically created by the test
+   */
+  lazy val bucketPrefix: Option[String] = None
+
+  /**
+   * Whether to randomly generate bucket names
+   */
+  val randomlyGenerateBucketNames: Boolean
+
+  implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
+    PropertyCheckConfiguration(minSuccessful = 1)
+
+  /**
+   * Whether to enable cleanup of buckets after tests are run and if so the initial delay to wait after the test
+   */
+  lazy val enableCleanup: Option[FiniteDuration] = None
+
+  /**
+   * The MaxTimeout when cleaning up all of the buckets during `afterAll`
+   */
+  lazy val maxCleanupTimeout: FiniteDuration = 10.minutes
+
+  /**
+   * @param constantBucketName The bucket name constant to use if we are not randomly generating bucket names
+   */
+  def genBucketName(constantBucketName: String): Gen[String] =
+    if (randomlyGenerateBucketNames)
+      Generators.bucketNameGen(useVirtualDotHost = false, bucketPrefix)
+    else
+      Gen.const(bucketPrefix.getOrElse("") ++ constantBucketName)
+
+  def createBucket(bucket: String, versioning: Boolean, bucketReference: AtomicReference[String], s3Attrs: Attributes)
+      : Future[Unit] =
+    for {
+      bucketResponse <- S3.checkIfBucketExists(bucket)(implicitly, s3Attrs)
+      _ <- bucketResponse match {
+        case BucketAccess.AccessDenied =>
+          throw new RuntimeException(
+            s"Unable to create bucket: $bucket since it already exists however permissions are inadequate")
+        case BucketAccess.AccessGranted | BucketAccess.NotExists =>
+          for {
+            _ <- bucketResponse match {
+              case BucketAccess.AccessGranted =>
+                system.log.info(
+                  s"Deleting and recreating bucket: $bucket since it already exists with correct permissions")
+                TestUtils.cleanAndDeleteBucket(bucket, s3Attrs)
+              case _ =>
+                Future.successful(())
+            }
+            _ <- S3.makeBucket(bucket)(implicitly, s3Attrs)
+            // TODO: Figure out a way to properly test this with Minio, see https://github.com/akka/alpakka/issues/2750
+            _ <- if (versioning && this.isInstanceOf[AWSS3IntegrationSpec])
+              S3.putBucketVersioning(bucket, BucketVersioning().withStatus(BucketVersioningStatus.Enabled))(implicitly,
+                s3Attrs)
+            else
+              Future.successful(())
+            _ = bucketReference.set(bucket)
+          } yield ()
+      }
+    } yield ()
+
+  private val defaultBucketReference = new AtomicReference[String]()
+  def withDefaultBucket(testCode: String => Assertion): Assertion =
+    testCode(defaultBucketReference.get())
 
   // with dots forcing path style access
-  val bucketWithDots = "my.test.frankfurt"
+  private val bucketWithDotsReference = new AtomicReference[String]()
+  def withBucketWithDots(testCode: String => Assertion): Assertion =
+    testCode(bucketWithDotsReference.get())
+
+  private val nonExistingBucketReference = new AtomicReference[String]()
+  def withNonExistingBucket(testCode: String => Assertion): Assertion =
+    testCode(nonExistingBucketReference.get())
+
+  private val bucketWithVersioningReference = new AtomicReference[String]()
+  def withBucketWithVersioning(testCode: String => Assertion): Assertion =
+    testCode(bucketWithVersioningReference.get())
 
-  val bucketWithVersioning = "my-bucket-with-versioning"
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val (bucketWithDots, defaultBucket, bucketWithVersioning, nonExistingBucket) = {
+      val baseGen = for {
+        bucketWithDots <- genBucketName(S3IntegrationSpec.BucketWithDots)
+        defaultBucket <- genBucketName(S3IntegrationSpec.DefaultBucket)
+        bucketWithVersioning <- genBucketName(S3IntegrationSpec.BucketWithVersioning)
+        nonExistingBucket <- genBucketName(S3IntegrationSpec.NonExistingBucket)
+      } yield (bucketWithDots, defaultBucket, bucketWithVersioning, nonExistingBucket)
+
+      if (randomlyGenerateBucketNames)
+        TestUtils.loopUntilGenRetrievesValue(baseGen.filter {

Review Comment:
   The `TestUtils.loopUntilGenRetrievesValue` is necessary here because we are manually invoking the scalacheck generator rather than using the Scalatest property checking integration (due to the fact its occurring in an `afterAll` block rather than an actual test).
   
   Due to this we and the fact that we are doing a `.filter` on the generator, we cannot use the standard `sample.get` because it will return `None` if the filter fails, hence why `TestUtils.loopUntilGenRetrievesValue` exists. 



##########
s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala:
##########
@@ -48,30 +50,180 @@ trait S3IntegrationSpec
     with BeforeAndAfterAll
     with Matchers
     with ScalaFutures
+    with ScalaCheckPropertyChecks
     with OptionValues
     with LogCapturing {
 
   implicit val ec: ExecutionContext = system.dispatcher
 
   implicit val defaultPatience: PatienceConfig = PatienceConfig(3.minutes, 100.millis)
 
-  val defaultBucket = "my-test-us-east-1"
-  val nonExistingBucket = "nowhere"
+  /**
+   * A prefix that will get added to each generated bucket in the test, this is to track the buckets that are
+   * specifically created by the test
+   */
+  lazy val bucketPrefix: Option[String] = None
+
+  /**
+   * Whether to randomly generate bucket names
+   */
+  val randomlyGenerateBucketNames: Boolean
+
+  implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
+    PropertyCheckConfiguration(minSuccessful = 1)
+
+  /**
+   * Whether to enable cleanup of buckets after tests are run and if so the initial delay to wait after the test
+   */
+  lazy val enableCleanup: Option[FiniteDuration] = None
+
+  /**
+   * The MaxTimeout when cleaning up all of the buckets during `afterAll`
+   */
+  lazy val maxCleanupTimeout: FiniteDuration = 10.minutes
+
+  /**
+   * @param constantBucketName The bucket name constant to use if we are not randomly generating bucket names
+   */
+  def genBucketName(constantBucketName: String): Gen[String] =
+    if (randomlyGenerateBucketNames)
+      Generators.bucketNameGen(useVirtualDotHost = false, bucketPrefix)
+    else
+      Gen.const(bucketPrefix.getOrElse("") ++ constantBucketName)
+
+  def createBucket(bucket: String, versioning: Boolean, bucketReference: AtomicReference[String], s3Attrs: Attributes)
+      : Future[Unit] =
+    for {
+      bucketResponse <- S3.checkIfBucketExists(bucket)(implicitly, s3Attrs)
+      _ <- bucketResponse match {
+        case BucketAccess.AccessDenied =>
+          throw new RuntimeException(
+            s"Unable to create bucket: $bucket since it already exists however permissions are inadequate")
+        case BucketAccess.AccessGranted | BucketAccess.NotExists =>
+          for {
+            _ <- bucketResponse match {
+              case BucketAccess.AccessGranted =>
+                system.log.info(
+                  s"Deleting and recreating bucket: $bucket since it already exists with correct permissions")
+                TestUtils.cleanAndDeleteBucket(bucket, s3Attrs)
+              case _ =>
+                Future.successful(())
+            }
+            _ <- S3.makeBucket(bucket)(implicitly, s3Attrs)
+            // TODO: Figure out a way to properly test this with Minio, see https://github.com/akka/alpakka/issues/2750
+            _ <- if (versioning && this.isInstanceOf[AWSS3IntegrationSpec])
+              S3.putBucketVersioning(bucket, BucketVersioning().withStatus(BucketVersioningStatus.Enabled))(implicitly,
+                s3Attrs)
+            else
+              Future.successful(())
+            _ = bucketReference.set(bucket)
+          } yield ()
+      }
+    } yield ()
+
+  private val defaultBucketReference = new AtomicReference[String]()
+  def withDefaultBucket(testCode: String => Assertion): Assertion =
+    testCode(defaultBucketReference.get())
 
   // with dots forcing path style access
-  val bucketWithDots = "my.test.frankfurt"
+  private val bucketWithDotsReference = new AtomicReference[String]()
+  def withBucketWithDots(testCode: String => Assertion): Assertion =
+    testCode(bucketWithDotsReference.get())
+
+  private val nonExistingBucketReference = new AtomicReference[String]()
+  def withNonExistingBucket(testCode: String => Assertion): Assertion =
+    testCode(nonExistingBucketReference.get())
+
+  private val bucketWithVersioningReference = new AtomicReference[String]()
+  def withBucketWithVersioning(testCode: String => Assertion): Assertion =
+    testCode(bucketWithVersioningReference.get())
 
-  val bucketWithVersioning = "my-bucket-with-versioning"
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val (bucketWithDots, defaultBucket, bucketWithVersioning, nonExistingBucket) = {
+      val baseGen = for {
+        bucketWithDots <- genBucketName(S3IntegrationSpec.BucketWithDots)
+        defaultBucket <- genBucketName(S3IntegrationSpec.DefaultBucket)
+        bucketWithVersioning <- genBucketName(S3IntegrationSpec.BucketWithVersioning)
+        nonExistingBucket <- genBucketName(S3IntegrationSpec.NonExistingBucket)
+      } yield (bucketWithDots, defaultBucket, bucketWithVersioning, nonExistingBucket)
+
+      if (randomlyGenerateBucketNames)
+        TestUtils.loopUntilGenRetrievesValue(baseGen.filter {

Review Comment:
   The `TestUtils.loopUntilGenRetrievesValue` is necessary here because we are manually invoking the scalacheck generator rather than using the Scalatest property checking integration (due to the fact its occurring in an `beforeAll` block rather than an actual test).
   
   Due to this we and the fact that we are doing a `.filter` on the generator, we cannot use the standard `sample.get` because it will return `None` if the filter fails, hence why `TestUtils.loopUntilGenRetrievesValue` exists. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on a diff in pull request #96: Integrate AWSS3IntegrationSpec with S3 Account

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #96:
URL: https://github.com/apache/incubator-pekko-connectors/pull/96#discussion_r1183329043


##########
s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala:
##########


Review Comment:
   These generators are directly copied from an OS company project (see https://github.com/aiven/guardian-for-apache-kafka/blob/main/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala) and hence I can confirm that they work correctly since they have been use for over a year.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org