You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/30 12:22:18 UTC

[GitHub] [pulsar] cbornet opened a new pull request #14948: Fix Kinesis integration test

cbornet opened a new pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948


   <!--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ### Motivation
   The current `PulsarSinksTest::testKinesis` is incorrect:
   * If the `GetRecordsResponse` doesn't contain any record, it passes
   * It configures the Sink to use the testcontainer mapped port instead of the internal one so the Sink can't connect to Localstack
   
   ### Modifications
   * Use port 4566 for `awsEndpointPort` which is the unmapped Localstack service port
   * Verify that all records received match exactly the one sent
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   * Run test `PulsarSinksTest::testKinesis`
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no) no
     - The public API: (yes / no) no
     - The schema: (yes / no / don't know) no
     - The default values of configurations: (yes / no) no
     - The wire protocol: (yes / no) no
     - The rest endpoints: (yes / no) no
     - The admin cli options: (yes / no) no
     - Anything that affects deployment: (yes / no / don't know) no
   
   ### Documentation
   
   Check the box below or label this PR directly (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] `doc-required` 
     
     (If you need help on updating docs, create a doc issue)
     
   - [x] `no-need-doc` 
     
     (Please explain why) it's a test
     
   - [ ] `doc` 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cbornet commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
cbornet commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r839442045



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -18,35 +18,34 @@
  */
 package org.apache.pulsar.tests.integration.io.sinks;
 
+import java.util.LinkedHashMap;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.testcontainers.containers.localstack.LocalStackContainer;
 import org.testcontainers.utility.DockerImageName;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
-import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
 
 @Slf4j
 public class KinesisSinkTester extends SinkTester<LocalStackContainer> {
 
     private static final String NAME = "kinesis";
+    private static final int LOCALSTACK_SERVICE_PORT = 4566;

Review comment:
       It is this port : https://github.com/testcontainers/testcontainers-java/blob/0408db806f6d57b8117881adcd10b6bd6c9dcd9c/modules/localstack/src/main/java/org/testcontainers/containers/localstack/LocalStackContainer.java#L38
   I didn't find an easy way to get it from Testcontainers.
   It's not dynamic as we use the internal container port




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r839396804



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -18,35 +18,34 @@
  */
 package org.apache.pulsar.tests.integration.io.sinks;
 
+import java.util.LinkedHashMap;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.testcontainers.containers.localstack.LocalStackContainer;
 import org.testcontainers.utility.DockerImageName;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
-import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
 
 @Slf4j
 public class KinesisSinkTester extends SinkTester<LocalStackContainer> {
 
     private static final String NAME = "kinesis";
+    private static final int LOCALSTACK_SERVICE_PORT = 4566;

Review comment:
       Just wondering about the fixed port number. Usually Testcontainers uses dynamic ports for everything so I'd assume that also in this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cbornet commented on pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
cbornet commented on pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#issuecomment-1086873011


   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cbornet commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
cbornet commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r840434745



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -64,16 +63,11 @@ public void prepareSink() throws Exception {
         final LocalStackContainer localStackContainer = getServiceContainer();
         final URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
         sinkConfig.put("awsEndpoint", NAME);
-        sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
+        sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);
         sinkConfig.put("skipCertificateValidation", true);
-        client = KinesisAsyncClient.builder().credentialsProvider(new AwsCredentialsProvider() {
-                    @Override
-                    public AwsCredentials resolveCredentials() {
-                        return AwsBasicCredentials.create(
-                                "access",
-                                "secret");
-                    }
-                })
+        client = KinesisAsyncClient.builder().credentialsProvider(() -> AwsBasicCredentials.create(

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cbornet commented on pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
cbornet commented on pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#issuecomment-1085997505


   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cbornet commented on pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
cbornet commented on pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#issuecomment-1083183521


   True but there's currently no method to do that in `SinkTester`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nicoloboschi commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r838482050



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -64,16 +63,11 @@ public void prepareSink() throws Exception {
         final LocalStackContainer localStackContainer = getServiceContainer();
         final URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
         sinkConfig.put("awsEndpoint", NAME);
-        sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
+        sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);
         sinkConfig.put("skipCertificateValidation", true);
-        client = KinesisAsyncClient.builder().credentialsProvider(new AwsCredentialsProvider() {
-                    @Override
-                    public AwsCredentials resolveCredentials() {
-                        return AwsBasicCredentials.create(
-                                "access",
-                                "secret");
-                    }
-                })
+        client = KinesisAsyncClient.builder().credentialsProvider(() -> AwsBasicCredentials.create(

Review comment:
       do we close this client? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r839549045



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -18,35 +18,34 @@
  */
 package org.apache.pulsar.tests.integration.io.sinks;
 
+import java.util.LinkedHashMap;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.testcontainers.containers.localstack.LocalStackContainer;
 import org.testcontainers.utility.DockerImageName;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
-import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
 
 @Slf4j
 public class KinesisSinkTester extends SinkTester<LocalStackContainer> {
 
     private static final String NAME = "kinesis";
+    private static final int LOCALSTACK_SERVICE_PORT = 4566;

Review comment:
       makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r839549669



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -64,16 +63,11 @@ public void prepareSink() throws Exception {
         final LocalStackContainer localStackContainer = getServiceContainer();
         final URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
         sinkConfig.put("awsEndpoint", NAME);
-        sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
+        sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);

Review comment:
       ok makes sense. Thanks for explaining.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r839399992



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -64,16 +63,11 @@ public void prepareSink() throws Exception {
         final LocalStackContainer localStackContainer = getServiceContainer();
         final URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
         sinkConfig.put("awsEndpoint", NAME);
-        sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
+        sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);

Review comment:
       it is the core of the problem :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r839398941



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -64,16 +63,11 @@ public void prepareSink() throws Exception {
         final LocalStackContainer localStackContainer = getServiceContainer();
         final URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
         sinkConfig.put("awsEndpoint", NAME);
-        sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
+        sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);

Review comment:
       Is this change necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cbornet commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
cbornet commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r839437368



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -64,16 +63,11 @@ public void prepareSink() throws Exception {
         final LocalStackContainer localStackContainer = getServiceContainer();
         final URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
         sinkConfig.put("awsEndpoint", NAME);
-        sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
+        sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);

Review comment:
       yes it is.
   We were using the port forwarded to the host. But the sink is not on the host but in a container so it must use the internal container port.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nicoloboschi commented on pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#issuecomment-1083189224


   I can do it today. I'm ok to merge this pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cbornet commented on pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
cbornet commented on pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#issuecomment-1083225562


   I had missed that there's a `stopServiceContainer` method that can be overridden. I'll do it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#discussion_r839399992



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
##########
@@ -64,16 +63,11 @@ public void prepareSink() throws Exception {
         final LocalStackContainer localStackContainer = getServiceContainer();
         final URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
         sinkConfig.put("awsEndpoint", NAME);
-        sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
+        sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);

Review comment:
       it is the core of the problem IIUC




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cbornet edited a comment on pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
cbornet edited a comment on pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#issuecomment-1083183521


   > I observed now that we don't close the KinesisClient in this test, can you fix it?
   
   True but there's currently no method to do that in `SinkTester`. Should we add one in this PR ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cbornet edited a comment on pull request #14948: Fix Kinesis integration test

Posted by GitBox <gi...@apache.org>.
cbornet edited a comment on pull request #14948:
URL: https://github.com/apache/pulsar/pull/14948#issuecomment-1083183521


   > I observed now that we don't close the KinesisClient in this test, can you fix it?
   
   True but there's currently no method to do that in `SinkTester`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org