You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/15 17:44:48 UTC
[flink] 02/02: [FLINK-14635][e2e tests] Use non-relocated imports
for AWS SDK Kinesis classes in tests
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2b0a8ceeb131c938d2e41dfee66099bfa5f366ae
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Nov 11 10:04:49 2019 +0100
[FLINK-14635][e2e tests] Use non-relocated imports for AWS SDK Kinesis classes in tests
- Moving pubsub client testutil to main project's to test jar
---
flink-connectors/flink-connector-kinesis/pom.xml | 5 ++++
.../kinesis/testutils}/KinesisPubsubClient.java | 35 +++++++++++-----------
.../flink-streaming-kinesis-test/pom.xml | 8 +++++
.../streaming/kinesis/test/KinesisExampleTest.java | 15 ++--------
4 files changed, 33 insertions(+), 30 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 415fbb8..652a510 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -169,6 +169,11 @@ under the License.
<goals>
<goal>test-jar</goal>
</goals>
+ <configuration>
+ <includes>
+ <include>**/org/apache/flink/streaming/connectors/kinesis/testutils/**</include>
+ </includes>
+ </configuration>
</execution>
</executions>
</plugin>
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
similarity index 78%
rename from flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
rename to flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
index 486b565..30eeb2c 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
@@ -15,26 +15,26 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.kinesis.test;
+package org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.kinesis.shaded.com.amazonaws.AmazonClientException;
-import org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
-import org.apache.flink.kinesis.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
-import org.apache.flink.kinesis.shaded.com.amazonaws.client.builder.AwsClientBuilder;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordRequest;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordResult;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,18 +46,21 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
+/**
+ * Simple client to publish and retrieve messages, using the AWS Kinesis SDK and the
+ * Flink Kinesis Connectos classes.
+ */
+public class KinesisPubsubClient {
private static final Logger LOG = LoggerFactory.getLogger(KinesisPubsubClient.class);
private final AmazonKinesis kinesisClient;
private final Properties properties;
- KinesisPubsubClient(Properties properties) {
+ public KinesisPubsubClient(Properties properties) {
this.kinesisClient = createClientWithCredentials(properties);
this.properties = properties;
}
- @Override
public void createTopic(String stream, int shards, Properties props) throws Exception {
try {
kinesisClient.describeStream(stream);
@@ -83,7 +86,6 @@ class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
}
}
- @Override
public void sendMessage(String topic, String msg) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(topic);
@@ -93,7 +95,6 @@ class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
LOG.info("added record: {}", putRecordResult.getSequenceNumber());
}
- @Override
public List<String> readAllMessages(String streamName) throws Exception {
KinesisProxyInterface kinesisProxy = KinesisProxy.create(properties);
Map<String, String> streamNamesWithLastSeenShardIds = new HashMap<>();
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
index b1b66e9..88b9354 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -52,6 +52,14 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
index 1a6d6d7..6f8f20a 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.kinesis.test;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -26,7 +27,6 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.List;
-import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -35,17 +35,6 @@ import java.util.concurrent.atomic.AtomicReference;
public class KinesisExampleTest {
private static final Logger LOG = LoggerFactory.getLogger(KinesisExampleTest.class);
- /**
- * Interface to the pubsub system for this test.
- */
- interface PubsubClient {
- void createTopic(String topic, int partitions, Properties props) throws Exception;
-
- void sendMessage(String topic, String msg);
-
- List<String> readAllMessages(String streamName) throws Exception;
- }
-
public static void main(String[] args) throws Exception {
LOG.info("System properties: {}", System.getProperties());
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
@@ -53,7 +42,7 @@ public class KinesisExampleTest {
String inputStream = parameterTool.getRequired("input-stream");
String outputStream = parameterTool.getRequired("output-stream");
- PubsubClient pubsub = new KinesisPubsubClient(parameterTool.getProperties());
+ KinesisPubsubClient pubsub = new KinesisPubsubClient(parameterTool.getProperties());
pubsub.createTopic(inputStream, 2, parameterTool.getProperties());
pubsub.createTopic(outputStream, 2, parameterTool.getProperties());