You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/02/21 15:34:03 UTC

[flink] branch master updated: [FLINK-26064][connector/firehose][connector/kinesis] Using separate event loop groups for firehose and kinesis IT tests.

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eca2ba  [FLINK-26064][connector/firehose][connector/kinesis] Using separate event loop groups for firehose and kinesis IT tests.
3eca2ba is described below

commit 3eca2ba3abb9a02109fe3d21066ea23eb869d6ff
Author: Ahmed Hamdy <va...@amazon.com>
AuthorDate: Mon Feb 21 10:30:31 2022 +0000

    [FLINK-26064][connector/firehose][connector/kinesis] Using separate event loop groups for firehose and kinesis IT tests.
---
 .../java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java | 9 +++++++--
 .../flink/connector/aws/testutils/AWSServicesTestUtils.java      | 7 ++++++-
 .../flink/connectors/kinesis/testutils/KinesaliteContainer.java  | 2 ++
 .../flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java | 2 --
 4 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
index 989f56e..dfc78d8 100644
--- a/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
+++ b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
@@ -227,6 +227,12 @@ public class AWSGeneralUtil {
     }
 
     public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configProperties) {
+        return createAsyncHttpClient(configProperties, NettyNioAsyncHttpClient.builder());
+    }
+
+    public static SdkAsyncHttpClient createAsyncHttpClient(
+            final Properties configProperties,
+            final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
         final AttributeMap.Builder clientConfiguration =
                 AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
 
@@ -262,8 +268,7 @@ public class AWSGeneralUtil {
                         protocol ->
                                 clientConfiguration.put(
                                         SdkHttpConfigurationOption.PROTOCOL, protocol));
-        return createAsyncHttpClient(
-                clientConfiguration.build(), NettyNioAsyncHttpClient.builder());
+        return createAsyncHttpClient(clientConfiguration.build(), httpClientBuilder);
     }
 
     public static SdkAsyncHttpClient createAsyncHttpClient(
diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
index 79fbe70..e0b53fd 100644
--- a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -26,6 +26,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.core.ResponseBytes;
 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.iam.IamAsyncClient;
 import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
@@ -96,7 +98,10 @@ public class AWSServicesTestUtils {
     }
 
     public static SdkAsyncHttpClient createHttpClient(String endpoint) {
-        return AWSGeneralUtil.createAsyncHttpClient(createConfig(endpoint));
+        return AWSGeneralUtil.createAsyncHttpClient(
+                createConfig(endpoint),
+                NettyNioAsyncHttpClient.builder()
+                        .eventLoopGroupBuilder(SdkEventLoopGroup.builder()));
     }
 
     public static void createBucket(S3AsyncClient s3Client, String bucketName)
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
index 934e6bc..f76555b 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
@@ -30,6 +30,7 @@ import software.amazon.awssdk.core.SdkSystemSetting;
 import software.amazon.awssdk.http.SdkHttpConfigurationOption;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
@@ -176,6 +177,7 @@ public class KinesaliteContainer extends GenericContainer<KinesaliteContainer> {
 
     private SdkAsyncHttpClient buildSdkAsyncHttpClient() {
         return NettyNioAsyncHttpClient.builder()
+                .eventLoopGroupBuilder(SdkEventLoopGroup.builder())
                 .buildWithDefaults(
                         AttributeMap.builder()
                                 .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true)
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
index 5170470..0e37f15 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +57,6 @@ import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehose
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */
-@Ignore("FLINK-26064")
 public class KinesisFirehoseSinkITCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);