You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/30 10:45:22 UTC

[GitHub] [beam] piotr-szuberski opened a new pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

piotr-szuberski opened a new pull request #12422:
URL: https://github.com/apache/beam/pull/12422


   I've done this by the way of writing cross-language integration tests.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466363355



##########
File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -413,6 +413,28 @@ public Read withAWSClientsProvider(
           new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
     }
 
+    /**
+     * Specify credential details and region to be used to read from Kinesis. If you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * Read#withAWSClientsProvider(AWSClientsProvider)}.
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
+     * the tests with Kinesis service emulator.
+     *
+     * <p>The {@code veriftCertificate} disables or enables certificate verification. Never set it

Review comment:
       typo: verif*y*Certificate `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-666312635


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464841242



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")
+            .withServices(LocalStackContainer.Service.KINESIS)
+            .withEnv("USE_SSL", "true")
+            .withStartupAttempts(3);
+    localstackContainer.start();
+
+    options.setAwsServiceEndpoint(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getServiceEndpoint()
+            .replace("http", "https"));
+    options.setAwsKinesisRegion(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getSigningRegion());
+    options.setAwsAccessKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId());
+    options.setAwsSecretKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey());
+    options.setNumberOfRecords(1000);
+    options.setNumberOfShards(1);
+    options.setAwsKinesisStream("beam_kinesis_test");
+    options.setAwsVerifyCertificate(false);
+    createStream(options.getAwsKinesisStream());
+  }
+
+  private static void createStream(String streamName) throws Exception {
+    AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
+
+    clientBuilder.setCredentials(localstackContainer.getDefaultCredentialsProvider());
+    clientBuilder.setEndpointConfiguration(
+        localstackContainer.getEndpointConfiguration(LocalStackContainer.Service.KINESIS));
+
+    AmazonKinesis client = clientBuilder.build();
+
+    client.createStream(streamName, 1);
+    int repeats = 10;
+    for (int i = 0; i <= repeats; ++i) {
+      String streamStatus =
+          client.describeStream(streamName).getStreamDescription().getStreamStatus();
+      if ("ACTIVE".equals(streamStatus)) {
+        break;
+      }
+      if (i == repeats) {
+        throw new RuntimeException("Unable to initialize stream");
+      }
+      Thread.sleep(1000L);
+    }
+  }
+
+  /** Check whether pipeline options were provided. If not, use localstack container. */
+  private static boolean doUseLocalstack() {
+    return "aws-access-key".equals(options.getAwsAccessKey())
+        && "aws-secret-key".equals(options.getAwsSecretKey())
+        && "aws-kinesis-stream".equals(options.getAwsKinesisStream())
+        && "aws-kinesis-region".equals(options.getAwsKinesisRegion())
+        && options.getNumberOfShards() == 2
+        && options.getNumberOfRecords() == 1000
+        && options.getAwsServiceEndpoint() == null
+        && options.getAwsVerifyCertificate();
+  }

Review comment:
       Very good point, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r467936493



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -35,33 +42,51 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";

Review comment:
       Should this version comply with a version of `ocalstack` dependency (`1.14.3`)?

##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -35,33 +42,51 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";

Review comment:
       Should this version comply with a version of `localstack` dependency (`1.14.3`)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-667101999


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466363355



##########
File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -413,6 +413,28 @@ public Read withAWSClientsProvider(
           new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
     }
 
+    /**
+     * Specify credential details and region to be used to read from Kinesis. If you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * Read#withAWSClientsProvider(AWSClientsProvider)}.
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
+     * the tests with Kinesis service emulator.
+     *
+     * <p>The {@code veriftCertificate} disables or enables certificate verification. Never set it

Review comment:
       typo: `verif y Certificate `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466629615



##########
File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java
##########
@@ -39,16 +40,27 @@
   private final String secretKey;
   private final Regions region;
   private final @Nullable String serviceEndpoint;
+  private final boolean verifyCertificate;
 
   BasicKinesisProvider(
-      String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
+      String accessKey,
+      String secretKey,
+      Regions region,
+      @Nullable String serviceEndpoint,
+      boolean verifyCertificate) {
     checkArgument(accessKey != null, "accessKey can not be null");
     checkArgument(secretKey != null, "secretKey can not be null");
     checkArgument(region != null, "region can not be null");
     this.accessKey = accessKey;
     this.secretKey = secretKey;
     this.region = region;
     this.serviceEndpoint = serviceEndpoint;
+    this.verifyCertificate = verifyCertificate;
+  }
+
+  BasicKinesisProvider(

Review comment:
       When you use KinesisIO.write()..withAWSClientsProvider("accesskey", "secretkey", "region", "serviceEndpointUrl") then without my modification beam will try to use real AWS url instead of the given "serviceEndpointUrl". I don't think this is the desired behaviour, but correct me if I'm wrong or I don't understand something (which is very probable, I haven't used AWS much in my life).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-666768084






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464853325



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")

Review comment:
       I changed it to the "latest" with a comment that it can potentially break after an image update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-668470214


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r465387416



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -35,33 +38,45 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
-
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();

Review comment:
       Ah I see. 
   
   Is it necessary to support testing with an existing stream? It seems preferable to always create a stream for each test and clean it up after the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466299826



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +42,52 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+  private static String streamName;
+  private static AmazonKinesis kinesisClient;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws Exception {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();
+    if (doUseLocalstack()) {
+      setupLocalstack();
+    }
+    kinesisClient = createKinesisClient();
+    streamName = "beam_test_kinesis" + UUID.randomUUID();

Review comment:
       I believe it should be both. In my company, we run it on our environment with our own privileges since Beam till now didn't have a dedicated access to AWS Kinesis (though, it could be changed in the future). In any case, I'd recommend to split creating/deleting a stream and writing/reading to/from steam parts since, as I mentioned before, they could have different privileges. So, I'd move the stream managing into Jenkins script and/or make it optional in KinesisIOIT (for example, use it only when `localstack` is used). 
   Also, we should allow to configure a stream name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-671462800


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464842213



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")

Review comment:
       I hardcoded it because some versions didn't work (it was impossible to create stream using kinesis sdk). It can be changed to "latest" but then it's possible that the test will stop working after an update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-671841184


   @TheNeuralBit Ok, thanks. I think it LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski removed a comment on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski removed a comment on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-667084649






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-671385520


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466252802



##########
File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java
##########
@@ -39,16 +40,27 @@
   private final String secretKey;
   private final Regions region;
   private final @Nullable String serviceEndpoint;
+  private final boolean verifyCertificate;
 
   BasicKinesisProvider(
-      String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
+      String accessKey,
+      String secretKey,
+      Regions region,
+      @Nullable String serviceEndpoint,
+      boolean verifyCertificate) {
     checkArgument(accessKey != null, "accessKey can not be null");
     checkArgument(secretKey != null, "secretKey can not be null");
     checkArgument(region != null, "region can not be null");
     this.accessKey = accessKey;
     this.secretKey = secretKey;
     this.region = region;
     this.serviceEndpoint = serviceEndpoint;
+    this.verifyCertificate = verifyCertificate;
+  }
+
+  BasicKinesisProvider(

Review comment:
       Service endpoint can be configured in the KinesisIO and I thought that omitting it in the producer configuration was a bug.
   Certificate verification is needed for cross-language tests (PR [12297](https://github.com/apache/beam/pull/12297)), we would need to use this TestKinesisProvider if verify_certificate was provided there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r465389201



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")

Review comment:
       Hm based on that I think what you had before (hard-code "0.11.3") would be preferable.That way the test is deterministic.  Let's break it out into a constant `LOCALSTACK_VERSION` or something though. 
   
   Sorry for the churn :grimacing: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-670143609


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-667143056


   Run CommunityMetrics PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464846401



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")

Review comment:
       The default version is 0.8.6, but it is one of those that don't work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466621117



##########
File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -413,6 +413,28 @@ public Read withAWSClientsProvider(
           new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
     }
 
+    /**
+     * Specify credential details and region to be used to read from Kinesis. If you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * Read#withAWSClientsProvider(AWSClientsProvider)}.
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
+     * the tests with Kinesis service emulator.
+     *
+     * <p>The {@code veriftCertificate} disables or enables certificate verification. Never set it

Review comment:
       Good catch, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466621223



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -95,32 +123,114 @@ private void runRead() {
     PCollection<KinesisRecord> output =
         pipelineRead.apply(
             KinesisIO.read()
-                .withStreamName(options.getAwsKinesisStream())
+                .withStreamName(streamName)
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
-                .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
-                .withInitialTimestampInStream(now)
+                .withMaxReadTime(Duration.standardMinutes(10L))
+                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    localstackContainer =
+        new LocalStackContainer(LOCALSTACK_VERSION)
+            .withServices(LocalStackContainer.Service.KINESIS)
+            .withEnv("USE_SSL", "true")
+            .withStartupAttempts(3);
+    localstackContainer.start();
+
+    options.setAwsServiceEndpoint(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getServiceEndpoint()
+            .replace("http", "https"));
+    options.setAwsKinesisRegion(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getSigningRegion());
+    options.setAwsAccessKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId());
+    options.setAwsSecretKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey());
+    options.setNumberOfRecords(1000);
+    options.setNumberOfShards(1);
+    options.setAwsKinesisStream("beam_kinesis_test");
+    options.setAwsVerifyCertificate(false);
+  }
+
+  private static AmazonKinesis createKinesisClient() {
+    AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
+
+    AWSCredentialsProvider credentialsProvider =
+        new AWSStaticCredentialsProvider(
+            new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey()));
+    clientBuilder.setCredentials(credentialsProvider);
+
+    if (options.getAwsServiceEndpoint() != null) {
+      AwsClientBuilder.EndpointConfiguration endpointConfiguration =
+          new AwsClientBuilder.EndpointConfiguration(
+              options.getAwsServiceEndpoint(), options.getAwsKinesisRegion());
+      clientBuilder.setEndpointConfiguration(endpointConfiguration);
+    } else {
+      clientBuilder.setRegion(options.getAwsKinesisRegion());
+    }
+
+    return clientBuilder.build();
+  }
+
+  private static void createStream() throws Exception {
+    kinesisClient.createStream(streamName, 1);
+    int repeats = 10;
+    for (int i = 0; i <= repeats; ++i) {
+      String streamStatus =
+          kinesisClient.describeStream(streamName).getStreamDescription().getStreamStatus();
+      if ("ACTIVE".equals(streamStatus)) {
+        break;
+      }
+      if (i == repeats) {
+        throw new RuntimeException("Unable to initialize stream");
+      }
+      Thread.sleep(1000L);
+    }
+  }
+
+  /** Check whether pipeline options were provided. If not, use localstack container. */
+  private static boolean doUseLocalstack() {

Review comment:
       Yeah, definitely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466299826



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +42,52 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+  private static String streamName;
+  private static AmazonKinesis kinesisClient;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws Exception {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();
+    if (doUseLocalstack()) {
+      setupLocalstack();
+    }
+    kinesisClient = createKinesisClient();
+    streamName = "beam_test_kinesis" + UUID.randomUUID();

Review comment:
       I believe it should be both. We run it on our environment with our own privileges since Beam till now didn't have a dedicated access to AWS Kinesis (though, it could be changed in the future). In any case, I'd recommend to split creating/deleting a stream and writing/reading to/from steam parts since, as I mentioned before, they could have different privileges. So, I'd move the stream managing into Jenkins script or make it optional in KinesisIOIT (for example, use it only when localstack is used). 
   Also, we should allow to configure a stream name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r465702081



##########
File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java
##########
@@ -39,16 +40,27 @@
   private final String secretKey;
   private final Regions region;
   private final @Nullable String serviceEndpoint;
+  private final boolean verifyCertificate;
 
   BasicKinesisProvider(
-      String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
+      String accessKey,
+      String secretKey,
+      Regions region,
+      @Nullable String serviceEndpoint,
+      boolean verifyCertificate) {
     checkArgument(accessKey != null, "accessKey can not be null");
     checkArgument(secretKey != null, "secretKey can not be null");
     checkArgument(region != null, "region can not be null");
     this.accessKey = accessKey;
     this.secretKey = secretKey;
     this.region = region;
     this.serviceEndpoint = serviceEndpoint;
+    this.verifyCertificate = verifyCertificate;
+  }
+
+  BasicKinesisProvider(

Review comment:
       Why you decided to modify `BasicKinesisProvider` and not just create a new provider class for testing that extends this basic one?

##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -95,32 +123,114 @@ private void runRead() {
     PCollection<KinesisRecord> output =
         pipelineRead.apply(
             KinesisIO.read()
-                .withStreamName(options.getAwsKinesisStream())
+                .withStreamName(streamName)
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
-                .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
-                .withInitialTimestampInStream(now)
+                .withMaxReadTime(Duration.standardMinutes(10L))
+                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    localstackContainer =
+        new LocalStackContainer(LOCALSTACK_VERSION)
+            .withServices(LocalStackContainer.Service.KINESIS)
+            .withEnv("USE_SSL", "true")
+            .withStartupAttempts(3);
+    localstackContainer.start();
+
+    options.setAwsServiceEndpoint(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getServiceEndpoint()
+            .replace("http", "https"));
+    options.setAwsKinesisRegion(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getSigningRegion());
+    options.setAwsAccessKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId());
+    options.setAwsSecretKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey());
+    options.setNumberOfRecords(1000);
+    options.setNumberOfShards(1);
+    options.setAwsKinesisStream("beam_kinesis_test");
+    options.setAwsVerifyCertificate(false);
+  }
+
+  private static AmazonKinesis createKinesisClient() {
+    AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
+
+    AWSCredentialsProvider credentialsProvider =
+        new AWSStaticCredentialsProvider(
+            new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey()));
+    clientBuilder.setCredentials(credentialsProvider);
+
+    if (options.getAwsServiceEndpoint() != null) {
+      AwsClientBuilder.EndpointConfiguration endpointConfiguration =
+          new AwsClientBuilder.EndpointConfiguration(
+              options.getAwsServiceEndpoint(), options.getAwsKinesisRegion());
+      clientBuilder.setEndpointConfiguration(endpointConfiguration);
+    } else {
+      clientBuilder.setRegion(options.getAwsKinesisRegion());
+    }
+
+    return clientBuilder.build();
+  }
+
+  private static void createStream() throws Exception {
+    kinesisClient.createStream(streamName, 1);
+    int repeats = 10;
+    for (int i = 0; i <= repeats; ++i) {
+      String streamStatus =
+          kinesisClient.describeStream(streamName).getStreamDescription().getStreamStatus();
+      if ("ACTIVE".equals(streamStatus)) {
+        break;
+      }
+      if (i == repeats) {
+        throw new RuntimeException("Unable to initialize stream");
+      }
+      Thread.sleep(1000L);
+    }
+  }
+
+  /** Check whether pipeline options were provided. If not, use localstack container. */
+  private static boolean doUseLocalstack() {

Review comment:
       Wouldn't be enough just to add `useLocalstack` option to `KinesisTestOptions`?

##########
File path: sdks/java/io/kinesis/build.gradle
##########
@@ -50,6 +50,7 @@ dependencies {
   testCompile library.java.powermock
   testCompile library.java.powermock_mockito
   testCompile "org.assertj:assertj-core:3.11.1"
+  testCompile 'org.testcontainers:localstack:1.11.2'

Review comment:
       Why not to use the more recent version, like `1.14.x`?

##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +42,52 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+  private static String streamName;
+  private static AmazonKinesis kinesisClient;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws Exception {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();
+    if (doUseLocalstack()) {
+      setupLocalstack();
+    }
+    kinesisClient = createKinesisClient();
+    streamName = "beam_test_kinesis" + UUID.randomUUID();

Review comment:
       Could you elaborate why the stream name is partly hardcoded and not configurable as it was before? We run KinesisIOIT on our "in-house" env against real Kinesis instance with pre-created stream with different privileges. 
   Probably, we should do this only in case of testing against LocalStack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-666291814


   R: @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r463325571



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +35,65 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
  * KinesisTestOptions} in order to run this.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String STREAM_NAME = "beam_kinesis";
+  private static final int NUM_RECORDS = 1000;
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
-  private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
+  static {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+  }
+
+  private final LocalStackContainer localstackContainer =
+      new LocalStackContainer("0.11.3")
+          .withServices(LocalStackContainer.Service.KINESIS)
+          .withEnv("USE_SSL", "true")
+          .withStartupAttempts(3);
+
+  private String endpoint;
+  private String region;
+  private String accessKey;
+  private String secretKey;
+
+  @Before
+  public void setup() throws Exception {
+    localstackContainer.start();
+    endpoint =
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getServiceEndpoint()
+            .replace("http", "https");
+    region =
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getSigningRegion();
+    accessKey =
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId();
+    secretKey =
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey();
+
+    createStream();
+  }
 
-  @BeforeClass
-  public static void setup() {
-    PipelineOptionsFactory.register(KinesisTestOptions.class);
-    options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();

Review comment:
       I think we should maintain the ability to test against production AWS. Someday maybe we'll get some AWS credits to run this continuously against prod, and it could still be useful for local testing. Could you make it so we only start up a localstack container if nothing in `KinesisTestOptions` is modified?
   
   When starting the localstack you could just set the relevant fields in the PipelineOptions and let the rest of the test read them as it does now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466249889



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +42,52 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+  private static String streamName;
+  private static AmazonKinesis kinesisClient;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws Exception {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();
+    if (doUseLocalstack()) {
+      setupLocalstack();
+    }
+    kinesisClient = createKinesisClient();
+    streamName = "beam_test_kinesis" + UUID.randomUUID();

Review comment:
       What I meant was to create a new stream just for testing purpose also on real Kinesis account instead of using an existing one which could have some data already written (and thus the test's correctness would be violated assuming that there can be data continously written to an existing stream or running the test twice at the same moment).
   
   I didn't think about things like different privileges on a pre-created stream. That's a good point and I agree. I'm not sure who is the target for this test - is it Beam's CI or a user who wants to test Beam against his Kinesis account?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r465579262



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")

Review comment:
       Not a problem at all, it's me who did it without consulting! ;) Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466351486



##########
File path: sdks/java/io/kinesis/build.gradle
##########
@@ -50,6 +50,7 @@ dependencies {
   testCompile library.java.powermock
   testCompile library.java.powermock_mockito
   testCompile "org.assertj:assertj-core:3.11.1"
+  testCompile 'org.testcontainers:localstack:1.11.2'

Review comment:
       Since `localstack` is going to be used in several modules, it would make sense to expand it as a separate library in `BeamModulePlugin` (like, `library.java.powermock` for example).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-667084649


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-670143022


   @aromanenko-dev I've done what you suggested without one thing that is left to discuss (the unresolved comment)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev merged pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged pull request #12422:
URL: https://github.com/apache/beam/pull/12422


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464846401



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")

Review comment:
       The default version (no param constructor) is 0.8.6, but it is one of those that don't work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466611126



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +42,52 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+  private static String streamName;
+  private static AmazonKinesis kinesisClient;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws Exception {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();
+    if (doUseLocalstack()) {
+      setupLocalstack();
+    }
+    kinesisClient = createKinesisClient();
+    streamName = "beam_test_kinesis" + UUID.randomUUID();

Review comment:
       Ok then, I'll leave the stream creation/deletion in localstack setup only. Thanks for explanation!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski removed a comment on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski removed a comment on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-667088807


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-669108700


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466299826



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +42,52 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+  private static String streamName;
+  private static AmazonKinesis kinesisClient;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws Exception {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();
+    if (doUseLocalstack()) {
+      setupLocalstack();
+    }
+    kinesisClient = createKinesisClient();
+    streamName = "beam_test_kinesis" + UUID.randomUUID();

Review comment:
       I believe it should be both. In my company, we run it on our environment with our own privileges since Beam till now didn't have a dedicated access to AWS Kinesis (though, it could be changed in the future). In any case, I'd recommend to split creating/deleting a stream and writing/reading to/from steam parts since, as I mentioned before, they could have different privileges. So, I'd move the stream managing into Jenkins script or make it optional in KinesisIOIT (for example, use it only when localstack is used). 
   Also, we should allow to configure a stream name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r465387956



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -35,33 +38,45 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
-
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();

Review comment:
       Sorry I know this is scope-creep that doesn't directly relate to your change. We can keep it as-is for now, and file a jira to follow-up if it makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464842213



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")

Review comment:
       I hardcoded it because some previous versions didn't work (it was impossible to create a stream using kinesis sdk) without any order. It can be changed to "latest" but then it's possible that the test will stop working after the localstack image update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-671385731


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464840886



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -35,33 +38,45 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
-
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();

Review comment:
       It's for the testing with real aws purpose. When a user wants to test Beam's KinesisIO with an existing stream then it would be best to read only the records that were written just in the time the test is running. If not, then it would be possible that write transform is broken and read works well (read some data from the past) and we have a false-positive. It's still possible if a user uses a stream that receives records from elsewhere, but it cannot be helped.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r465580319



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -35,33 +38,45 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
-
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();

Review comment:
       Not a problem, it's already done. Thanks for your watchful eye, of course it's better to create a new stream for the test but I was stuck to the existing code. I also changed the InitialPositionInStream to TRIM_HORIZON, which reads the stream from the beginning instead of given timestamp. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466351486



##########
File path: sdks/java/io/kinesis/build.gradle
##########
@@ -50,6 +50,7 @@ dependencies {
   testCompile library.java.powermock
   testCompile library.java.powermock_mockito
   testCompile "org.assertj:assertj-core:3.11.1"
+  testCompile 'org.testcontainers:localstack:1.11.2'

Review comment:
       Since `localstack` is going to be used in several modules, it would make sense to expand it as a separate library in `BeamModulePlugin.groovy` (like, `library.java.powermock` for example).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-667076611


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466611126



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +42,52 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+  private static String streamName;
+  private static AmazonKinesis kinesisClient;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws Exception {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();
+    if (doUseLocalstack()) {
+      setupLocalstack();
+    }
+    kinesisClient = createKinesisClient();
+    streamName = "beam_test_kinesis" + UUID.randomUUID();

Review comment:
       Ok then, I'll leave the stream creation/deletion in localstack setup only.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r467962690



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -35,33 +42,51 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";

Review comment:
       Not at all. 0.11.3 is the latest localstack image available on Dockerhub.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-669074844


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464842213



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")

Review comment:
       I hardcoded it because some previous versions didn't work (it was impossible to create a stream using kinesis sdk). It can be changed to "latest" but then it's possible that the test will stop working after the localstack image update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466361678



##########
File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java
##########
@@ -39,16 +40,27 @@
   private final String secretKey;
   private final Regions region;
   private final @Nullable String serviceEndpoint;
+  private final boolean verifyCertificate;
 
   BasicKinesisProvider(
-      String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
+      String accessKey,
+      String secretKey,
+      Regions region,
+      @Nullable String serviceEndpoint,
+      boolean verifyCertificate) {
     checkArgument(accessKey != null, "accessKey can not be null");
     checkArgument(secretKey != null, "secretKey can not be null");
     checkArgument(region != null, "region can not be null");
     this.accessKey = accessKey;
     this.secretKey = secretKey;
     this.region = region;
     this.serviceEndpoint = serviceEndpoint;
+    this.verifyCertificate = verifyCertificate;
+  }
+
+  BasicKinesisProvider(

Review comment:
       I'm not sure that service endpoint is used in `com.amazonaws.services.kinesis.producer.KinesisProducer` which is actually used for `KinesisIO.Write`. Am I mistaken?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464702259



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")
+            .withServices(LocalStackContainer.Service.KINESIS)
+            .withEnv("USE_SSL", "true")
+            .withStartupAttempts(3);
+    localstackContainer.start();
+
+    options.setAwsServiceEndpoint(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getServiceEndpoint()
+            .replace("http", "https"));
+    options.setAwsKinesisRegion(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getSigningRegion());
+    options.setAwsAccessKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId());
+    options.setAwsSecretKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey());
+    options.setNumberOfRecords(1000);
+    options.setNumberOfShards(1);
+    options.setAwsKinesisStream("beam_kinesis_test");
+    options.setAwsVerifyCertificate(false);
+    createStream(options.getAwsKinesisStream());
+  }
+
+  private static void createStream(String streamName) throws Exception {
+    AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
+
+    clientBuilder.setCredentials(localstackContainer.getDefaultCredentialsProvider());
+    clientBuilder.setEndpointConfiguration(
+        localstackContainer.getEndpointConfiguration(LocalStackContainer.Service.KINESIS));
+
+    AmazonKinesis client = clientBuilder.build();
+
+    client.createStream(streamName, 1);
+    int repeats = 10;
+    for (int i = 0; i <= repeats; ++i) {
+      String streamStatus =
+          client.describeStream(streamName).getStreamDescription().getStreamStatus();
+      if ("ACTIVE".equals(streamStatus)) {
+        break;
+      }
+      if (i == repeats) {
+        throw new RuntimeException("Unable to initialize stream");
+      }
+      Thread.sleep(1000L);
+    }
+  }
+
+  /** Check whether pipeline options were provided. If not, use localstack container. */
+  private static boolean doUseLocalstack() {
+    return "aws-access-key".equals(options.getAwsAccessKey())
+        && "aws-secret-key".equals(options.getAwsSecretKey())
+        && "aws-kinesis-stream".equals(options.getAwsKinesisStream())
+        && "aws-kinesis-region".equals(options.getAwsKinesisRegion())
+        && options.getNumberOfShards() == 2
+        && options.getNumberOfRecords() == 1000
+        && options.getAwsServiceEndpoint() == null
+        && options.getAwsVerifyCertificate();
+  }

Review comment:
       Rather than repeating the default values here, you could create a fresh instance using `PipelineOptionsFactory.fromArgs().as(KinesisTestOptions.class)` and compare each field against it.

##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));
+
+    localstackContainer =
+        new LocalStackContainer("0.11.3")

Review comment:
       Is this something we're going to have to keep up-to-date? Is it possible to just omit it?

##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -99,28 +116,101 @@ private void runRead() {
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
+                .withMaxReadTime(Duration.standardMinutes(10L))
                 .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                 .withInitialTimestampInStream(now)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() throws Exception {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    // For some unclear reason localstack requires the timestamp in seconds
+    now = Instant.ofEpochMilli(Long.divideUnsigned(Instant.now().getMillis(), 1000));

Review comment:
       :(

##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -35,33 +38,45 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
-
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();

Review comment:
       Does this need to be `Instant.now()`? Might be preferable to just make it some constant to make it more repeatable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r464840886



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -35,33 +38,45 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
-
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();

Review comment:
       It's for the testing with real aws purpose. When a user wants to test Beam's KinesisIO with an existing stream then it would be best to read only the records that were written just in the time the test is running. If not, then it would be possible that write transform is broken and read works well (read some data from the past) and we have false-positive. It's still possible if a user uses a stream that receives records from elsewhere, but it cannot be helped.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466363355



##########
File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
##########
@@ -413,6 +413,28 @@ public Read withAWSClientsProvider(
           new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
     }
 
+    /**
+     * Specify credential details and region to be used to read from Kinesis. If you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * Read#withAWSClientsProvider(AWSClientsProvider)}.
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
+     * the tests with Kinesis service emulator.
+     *
+     * <p>The {@code veriftCertificate} disables or enables certificate verification. Never set it

Review comment:
       typo: verif**y**Certificate `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466620915



##########
File path: sdks/java/io/kinesis/build.gradle
##########
@@ -50,6 +50,7 @@ dependencies {
   testCompile library.java.powermock
   testCompile library.java.powermock_mockito
   testCompile "org.assertj:assertj-core:3.11.1"
+  testCompile 'org.testcontainers:localstack:1.11.2'

Review comment:
       Yeah, I'll do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r463559333



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +35,65 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
  * KinesisTestOptions} in order to run this.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String STREAM_NAME = "beam_kinesis";
+  private static final int NUM_RECORDS = 1000;
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
-  private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
+  static {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+  }
+
+  private final LocalStackContainer localstackContainer =
+      new LocalStackContainer("0.11.3")
+          .withServices(LocalStackContainer.Service.KINESIS)
+          .withEnv("USE_SSL", "true")
+          .withStartupAttempts(3);
+
+  private String endpoint;
+  private String region;
+  private String accessKey;
+  private String secretKey;
+
+  @Before
+  public void setup() throws Exception {
+    localstackContainer.start();
+    endpoint =
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getServiceEndpoint()
+            .replace("http", "https");
+    region =
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getSigningRegion();
+    accessKey =
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId();
+    secretKey =
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey();
+
+    createStream();
+  }
 
-  @BeforeClass
-  public static void setup() {
-    PipelineOptionsFactory.register(KinesisTestOptions.class);
-    options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466299826



##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +42,52 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+  private static String streamName;
+  private static AmazonKinesis kinesisClient;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws Exception {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();
+    if (doUseLocalstack()) {
+      setupLocalstack();
+    }
+    kinesisClient = createKinesisClient();
+    streamName = "beam_test_kinesis" + UUID.randomUUID();

Review comment:
       I believe it should be both. In my company, we run it on our environment with our own privileges since Beam till now didn't have a dedicated access to AWS Kinesis (though, it could be changed in the future). In any case, I'd recommend to split creating/deleting a stream and writing/reading to/from steam parts since, as I mentioned before, they could have different privileges. So, I'd move the stream managing into Jenkins script and/or make it optional in KinesisIOIT (for example, use it only when localstack is used). 
   Also, we should allow to configure a stream name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-671386509


   @piotr-szuberski Thanks, almost LGTM, just a minor question above.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466253240



##########
File path: sdks/java/io/kinesis/build.gradle
##########
@@ -50,6 +50,7 @@ dependencies {
   testCompile library.java.powermock
   testCompile library.java.powermock_mockito
   testCompile "org.assertj:assertj-core:3.11.1"
+  testCompile 'org.testcontainers:localstack:1.11.2'

Review comment:
       Very good point. I copied it from DynamoDB tests. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-671638673


   @aromanenko-dev I'll leave it up to you to wrap up this review and merge since you're more familiar with this code :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12422:
URL: https://github.com/apache/beam/pull/12422#issuecomment-667088807


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org