You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/05 15:16:28 UTC

[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r465702081



##########
File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java
##########
@@ -39,16 +40,27 @@
   private final String secretKey;
   private final Regions region;
   private final @Nullable String serviceEndpoint;
+  private final boolean verifyCertificate;
 
   BasicKinesisProvider(
-      String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
+      String accessKey,
+      String secretKey,
+      Regions region,
+      @Nullable String serviceEndpoint,
+      boolean verifyCertificate) {
     checkArgument(accessKey != null, "accessKey can not be null");
     checkArgument(secretKey != null, "secretKey can not be null");
     checkArgument(region != null, "region can not be null");
     this.accessKey = accessKey;
     this.secretKey = secretKey;
     this.region = region;
     this.serviceEndpoint = serviceEndpoint;
+    this.verifyCertificate = verifyCertificate;
+  }
+
+  BasicKinesisProvider(

Review comment:
       Why you decided to modify `BasicKinesisProvider` and not just create a new provider class for testing that extends this basic one?

##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -95,32 +123,114 @@ private void runRead() {
     PCollection<KinesisRecord> output =
         pipelineRead.apply(
             KinesisIO.read()
-                .withStreamName(options.getAwsKinesisStream())
+                .withStreamName(streamName)
                 .withAWSClientsProvider(
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
-                    Regions.fromName(options.getAwsKinesisRegion()))
-                .withMaxNumRecords(numberOfRows)
+                    Regions.fromName(options.getAwsKinesisRegion()),
+                    options.getAwsServiceEndpoint(),
+                    options.getAwsVerifyCertificate())
+                .withMaxNumRecords(options.getNumberOfRecords())
                 // to prevent endless running in case of error
-                .withMaxReadTime(Duration.standardMinutes(10))
-                .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
-                .withInitialTimestampInStream(now)
+                .withMaxReadTime(Duration.standardMinutes(10L))
+                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
                 .withRequestRecordsLimit(1000));
 
     PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-        .isEqualTo((long) numberOfRows);
+        .isEqualTo((long) options.getNumberOfRecords());
 
     PCollection<String> consolidatedHashcode =
         output
             .apply(ParDo.of(new ExtractDataValues()))
             .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
 
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
     pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() {
+    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
+
+    localstackContainer =
+        new LocalStackContainer(LOCALSTACK_VERSION)
+            .withServices(LocalStackContainer.Service.KINESIS)
+            .withEnv("USE_SSL", "true")
+            .withStartupAttempts(3);
+    localstackContainer.start();
+
+    options.setAwsServiceEndpoint(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getServiceEndpoint()
+            .replace("http", "https"));
+    options.setAwsKinesisRegion(
+        localstackContainer
+            .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+            .getSigningRegion());
+    options.setAwsAccessKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId());
+    options.setAwsSecretKey(
+        localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey());
+    options.setNumberOfRecords(1000);
+    options.setNumberOfShards(1);
+    options.setAwsKinesisStream("beam_kinesis_test");
+    options.setAwsVerifyCertificate(false);
+  }
+
+  private static AmazonKinesis createKinesisClient() {
+    AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
+
+    AWSCredentialsProvider credentialsProvider =
+        new AWSStaticCredentialsProvider(
+            new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey()));
+    clientBuilder.setCredentials(credentialsProvider);
+
+    if (options.getAwsServiceEndpoint() != null) {
+      AwsClientBuilder.EndpointConfiguration endpointConfiguration =
+          new AwsClientBuilder.EndpointConfiguration(
+              options.getAwsServiceEndpoint(), options.getAwsKinesisRegion());
+      clientBuilder.setEndpointConfiguration(endpointConfiguration);
+    } else {
+      clientBuilder.setRegion(options.getAwsKinesisRegion());
+    }
+
+    return clientBuilder.build();
+  }
+
+  private static void createStream() throws Exception {
+    kinesisClient.createStream(streamName, 1);
+    int repeats = 10;
+    for (int i = 0; i <= repeats; ++i) {
+      String streamStatus =
+          kinesisClient.describeStream(streamName).getStreamDescription().getStreamStatus();
+      if ("ACTIVE".equals(streamStatus)) {
+        break;
+      }
+      if (i == repeats) {
+        throw new RuntimeException("Unable to initialize stream");
+      }
+      Thread.sleep(1000L);
+    }
+  }
+
+  /** Check whether pipeline options were provided. If not, use localstack container. */
+  private static boolean doUseLocalstack() {

Review comment:
       Wouldn't be enough just to add `useLocalstack` option to `KinesisTestOptions`?

##########
File path: sdks/java/io/kinesis/build.gradle
##########
@@ -50,6 +50,7 @@ dependencies {
   testCompile library.java.powermock
   testCompile library.java.powermock_mockito
   testCompile "org.assertj:assertj-core:3.11.1"
+  testCompile 'org.testcontainers:localstack:1.11.2'

Review comment:
       Why not to use the more recent version, like `1.14.x`?

##########
File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -34,34 +42,52 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
- * KinesisTestOptions} in order to run this.
+ * KinesisTestOptions} in order to run this if you want to test it with production setup. By default
+ * when no options are provided an instance of localstack is used.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String LOCALSTACK_VERSION = "0.11.3";
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
+  private static LocalStackContainer localstackContainer;
+  private static String streamName;
+  private static AmazonKinesis kinesisClient;
+
   private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws Exception {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-    numberOfShards = options.getNumberOfShards();
-    numberOfRows = options.getNumberOfRecords();
+    if (doUseLocalstack()) {
+      setupLocalstack();
+    }
+    kinesisClient = createKinesisClient();
+    streamName = "beam_test_kinesis" + UUID.randomUUID();

Review comment:
       Could you elaborate why the stream name is partly hardcoded and not configurable as it was before? We run KinesisIOIT on our "in-house" env against real Kinesis instance with pre-created stream with different privileges. 
   Probably, we should do this only in case of testing against LocalStack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org