You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/05/13 14:28:56 UTC

[flink] 02/08: [FLINK-11086][e2e] Properly add Avro JARs in SQLClientKafkaITCase

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07330dd6aeabf84d450440614dcb0117bfd1c9c7
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Mon May 4 10:11:58 2020 +0200

    [FLINK-11086][e2e] Properly add Avro JARs in SQLClientKafkaITCase
---
 .../flink/tests/util/kafka/SQLClientKafkaITCase.java   | 18 +++++++++++++++---
 .../flink/tests/util/flink/SQLJobSubmission.java       |  5 +++++
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index 923377e..df3dec3 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.tests.util.kafka;
 
 import org.apache.flink.tests.util.TestUtils;
-import org.apache.flink.tests.util.categories.Hadoop;
+import org.apache.flink.tests.util.cache.DownloadCache;
 import org.apache.flink.tests.util.categories.TravisGroup1;
 import org.apache.flink.tests.util.flink.ClusterController;
 import org.apache.flink.tests.util.flink.FlinkResource;
@@ -51,9 +51,11 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
@@ -63,7 +65,7 @@ import static org.junit.Assert.assertThat;
  * End-to-end test for the kafka SQL connectors.
  */
 @RunWith(Parameterized.class)
-@Category(value = {TravisGroup1.class, FailsOnJava11.class, Hadoop.class})
+@Category(value = {TravisGroup1.class, FailsOnJava11.class})
 public class SQLClientKafkaITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SQLClientKafkaITCase.class);
@@ -93,9 +95,12 @@ public class SQLClientKafkaITCase extends TestLogger {
 	private Path result;
 	private Path sqlClientSessionConf;
 
+	private static final DownloadCache downloadCache = DownloadCache.get();
+
 	private static final Path sqlAvroJar = TestUtils.getResourceJar(".*avro.jar");
 	private static final Path sqlJsonJar = TestUtils.getResourceJar(".*json.jar");
 	private static final Path sqlToolBoxJar = TestUtils.getResourceJar(".*SqlToolbox.jar");
+	private final List<Path> apacheAvroJars = new ArrayList<>();
 	private final Path sqlConnectorKafkaJar;
 
 	public SQLClientKafkaITCase(String kafkaVersion, String kafkaSQLVersion, String kafkaSQLJarPattern) {
@@ -106,11 +111,16 @@ public class SQLClientKafkaITCase extends TestLogger {
 	}
 
 	@Before
-	public void before() {
+	public void before() throws Exception {
+		downloadCache.before();
 		Path tmpPath = tmp.getRoot().toPath();
 		LOG.info("The current temporary path: {}", tmpPath);
 		this.sqlClientSessionConf = tmpPath.resolve("sql-client-session.conf");
 		this.result = tmpPath.resolve("result");
+
+		apacheAvroJars.add(downloadCache.getOrDownload("https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar", tmpPath));
+		apacheAvroJars.add(downloadCache.getOrDownload("https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar", tmpPath));
+		apacheAvroJars.add(downloadCache.getOrDownload("https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar", tmpPath));
 	}
 
 	@Test
@@ -179,6 +189,7 @@ public class SQLClientKafkaITCase extends TestLogger {
 
 		clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlStatement1)
 				.addJar(sqlAvroJar)
+				.addJars(apacheAvroJars)
 				.addJar(sqlJsonJar)
 				.addJar(sqlConnectorKafkaJar)
 				.addJar(sqlToolBoxJar)
@@ -194,6 +205,7 @@ public class SQLClientKafkaITCase extends TestLogger {
 
 		clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlStatement2)
 				.addJar(sqlAvroJar)
+				.addJars(apacheAvroJars)
 				.addJar(sqlJsonJar)
 				.addJar(sqlConnectorKafkaJar)
 				.addJar(sqlToolBoxJar)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/SQLJobSubmission.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/SQLJobSubmission.java
index cfb3e39..1f4013a 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/SQLJobSubmission.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/SQLJobSubmission.java
@@ -90,6 +90,11 @@ public class SQLJobSubmission {
 			return this;
 		}
 
+		public SQLJobSubmissionBuilder addJars(List<Path> jarFiles) {
+			jarFiles.forEach(this::addJar);
+			return this;
+		}
+
 		public SQLJobSubmission build() {
 			return new SQLJobSubmission(sql, jars, defaultEnvFile, sessionEnvFile);
 		}