You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/02/21 15:34:03 UTC
[flink] branch master updated: [FLINK-26064][connector/firehose][connector/kinesis] Using separate event loop groups for firehose and kinesis IT tests.
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3eca2ba [FLINK-26064][connector/firehose][connector/kinesis] Using separate event loop groups for firehose and kinesis IT tests.
3eca2ba is described below
commit 3eca2ba3abb9a02109fe3d21066ea23eb869d6ff
Author: Ahmed Hamdy <va...@amazon.com>
AuthorDate: Mon Feb 21 10:30:31 2022 +0000
[FLINK-26064][connector/firehose][connector/kinesis] Using separate event loop groups for firehose and kinesis IT tests.
---
.../java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java | 9 +++++++--
.../flink/connector/aws/testutils/AWSServicesTestUtils.java | 7 ++++++-
.../flink/connectors/kinesis/testutils/KinesaliteContainer.java | 2 ++
.../flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java | 2 --
4 files changed, 15 insertions(+), 5 deletions(-)
diff --git a/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
index 989f56e..dfc78d8 100644
--- a/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
+++ b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
@@ -227,6 +227,12 @@ public class AWSGeneralUtil {
}
public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configProperties) {
+ return createAsyncHttpClient(configProperties, NettyNioAsyncHttpClient.builder());
+ }
+
+ public static SdkAsyncHttpClient createAsyncHttpClient(
+ final Properties configProperties,
+ final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
final AttributeMap.Builder clientConfiguration =
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
@@ -262,8 +268,7 @@ public class AWSGeneralUtil {
protocol ->
clientConfiguration.put(
SdkHttpConfigurationOption.PROTOCOL, protocol));
- return createAsyncHttpClient(
- clientConfiguration.build(), NettyNioAsyncHttpClient.builder());
+ return createAsyncHttpClient(clientConfiguration.build(), httpClientBuilder);
}
public static SdkAsyncHttpClient createAsyncHttpClient(
diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
index 79fbe70..e0b53fd 100644
--- a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -26,6 +26,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.iam.IamAsyncClient;
import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
@@ -96,7 +98,10 @@ public class AWSServicesTestUtils {
}
public static SdkAsyncHttpClient createHttpClient(String endpoint) {
- return AWSGeneralUtil.createAsyncHttpClient(createConfig(endpoint));
+ return AWSGeneralUtil.createAsyncHttpClient(
+ createConfig(endpoint),
+ NettyNioAsyncHttpClient.builder()
+ .eventLoopGroupBuilder(SdkEventLoopGroup.builder()));
}
public static void createBucket(S3AsyncClient s3Client, String bucketName)
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
index 934e6bc..f76555b 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
@@ -30,6 +30,7 @@ import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
@@ -176,6 +177,7 @@ public class KinesaliteContainer extends GenericContainer<KinesaliteContainer> {
private SdkAsyncHttpClient buildSdkAsyncHttpClient() {
return NettyNioAsyncHttpClient.builder()
+ .eventLoopGroupBuilder(SdkEventLoopGroup.builder())
.buildWithDefaults(
AttributeMap.builder()
.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true)
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
index 5170470..0e37f15 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +57,6 @@ import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehose
import static org.assertj.core.api.Assertions.assertThat;
/** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */
-@Ignore("FLINK-26064")
public class KinesisFirehoseSinkITCase {
private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);