You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/15 17:44:48 UTC

[flink] 02/02: [FLINK-14635][e2e tests] Use non-relocated imports for AWS SDK Kinesis classes in tests

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b0a8ceeb131c938d2e41dfee66099bfa5f366ae
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Nov 11 10:04:49 2019 +0100

    [FLINK-14635][e2e tests] Use non-relocated imports for AWS SDK Kinesis classes in tests
    
      - Moving pubsub client testutil to main project's to test jar
---
 flink-connectors/flink-connector-kinesis/pom.xml   |  5 ++++
 .../kinesis/testutils}/KinesisPubsubClient.java    | 35 +++++++++++-----------
 .../flink-streaming-kinesis-test/pom.xml           |  8 +++++
 .../streaming/kinesis/test/KinesisExampleTest.java | 15 ++--------
 4 files changed, 33 insertions(+), 30 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 415fbb8..652a510 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -169,6 +169,11 @@ under the License.
 						<goals>
 							<goal>test-jar</goal>
 						</goals>
+						<configuration>
+							<includes>
+								<include>**/org/apache/flink/streaming/connectors/kinesis/testutils/**</include>
+							</includes>
+						</configuration>
 					</execution>
 				</executions>
 			</plugin>
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
similarity index 78%
rename from flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
rename to flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
index 486b565..30eeb2c 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
@@ -15,26 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.kinesis.test;
+package org.apache.flink.streaming.connectors.kinesis.testutils;
 
 import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.kinesis.shaded.com.amazonaws.AmazonClientException;
-import org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
-import org.apache.flink.kinesis.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
-import org.apache.flink.kinesis.shaded.com.amazonaws.client.builder.AwsClientBuilder;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordRequest;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordResult;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
-import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,18 +46,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
+/**
+ * Simple client to publish and retrieve messages, using the AWS Kinesis SDK and the
+ * Flink Kinesis Connectos classes.
+ */
+public class KinesisPubsubClient {
 	private static final Logger LOG = LoggerFactory.getLogger(KinesisPubsubClient.class);
 
 	private final AmazonKinesis kinesisClient;
 	private final Properties properties;
 
-	KinesisPubsubClient(Properties properties) {
+	public KinesisPubsubClient(Properties properties) {
 		this.kinesisClient = createClientWithCredentials(properties);
 		this.properties = properties;
 	}
 
-	@Override
 	public void createTopic(String stream, int shards, Properties props) throws Exception {
 		try {
 			kinesisClient.describeStream(stream);
@@ -83,7 +86,6 @@ class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
 		}
 	}
 
-	@Override
 	public void sendMessage(String topic, String msg) {
 		PutRecordRequest putRecordRequest = new PutRecordRequest();
 		putRecordRequest.setStreamName(topic);
@@ -93,7 +95,6 @@ class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
 		LOG.info("added record: {}", putRecordResult.getSequenceNumber());
 	}
 
-	@Override
 	public List<String> readAllMessages(String streamName) throws Exception {
 		KinesisProxyInterface kinesisProxy = KinesisProxy.create(properties);
 		Map<String, String> streamNamesWithLastSeenShardIds = new HashMap<>();
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
index b1b66e9..88b9354 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -52,6 +52,14 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>compile</scope>
+		</dependency>
+
+		<dependency>
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>
 			<version>${junit.version}</version>
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
index 1a6d6d7..6f8f20a 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.kinesis.test;
 
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
 
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -26,7 +27,6 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.List;
-import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -35,17 +35,6 @@ import java.util.concurrent.atomic.AtomicReference;
 public class KinesisExampleTest {
 	private static final Logger LOG = LoggerFactory.getLogger(KinesisExampleTest.class);
 
-	/**
-	 * Interface to the pubsub system for this test.
-	 */
-	interface PubsubClient {
-		void createTopic(String topic, int partitions, Properties props) throws Exception;
-
-		void sendMessage(String topic, String msg);
-
-		List<String> readAllMessages(String streamName) throws Exception;
-	}
-
 	public static void main(String[] args) throws Exception {
 		LOG.info("System properties: {}", System.getProperties());
 		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
@@ -53,7 +42,7 @@ public class KinesisExampleTest {
 		String inputStream = parameterTool.getRequired("input-stream");
 		String outputStream = parameterTool.getRequired("output-stream");
 
-		PubsubClient pubsub = new KinesisPubsubClient(parameterTool.getProperties());
+		KinesisPubsubClient pubsub = new KinesisPubsubClient(parameterTool.getProperties());
 		pubsub.createTopic(inputStream, 2, parameterTool.getProperties());
 		pubsub.createTopic(outputStream, 2, parameterTool.getProperties());