You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/07/02 16:31:36 UTC

[flink] 01/02: [FLINK-16572] Clean up PubSub connector e2e test

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1170b2fda49841df1908eb7d02dc7f2bb0c65104
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Wed Jun 3 13:26:50 2020 +0200

    [FLINK-16572] Clean up PubSub connector e2e test
    
    - execute as regular test to have proper logging
    - document copied code
    - fix typos
---
 NOTICE                                                   |  6 ++++++
 .../flink-connector-gcp-pubsub-emulator-tests/pom.xml    |  2 +-
 .../connectors/gcp/pubsub/CheckPubSubEmulatorTest.java   | 13 ++++++-------
 .../gcp/pubsub/emulator/GCloudEmulatorManager.java       | 16 +++++++++++++---
 .../gcp/pubsub/emulator/GCloudUnitTestBase.java          |  4 ++++
 .../connectors/gcp/pubsub/emulator/PubsubHelper.java     |  3 +++
 .../src/test/resources/log4j2-test.properties            |  5 ++---
 .../test-scripts/test_streaming_gcp_pubsub.sh            |  2 ++
 8 files changed, 37 insertions(+), 14 deletions(-)

diff --git a/NOTICE b/NOTICE
index 2179dc8..b4769c5 100644
--- a/NOTICE
+++ b/NOTICE
@@ -34,3 +34,9 @@ ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUT
 DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
 WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE
 USE OR PERFORMANCE OF THIS SOFTWARE.
+
+The Apache Flink project contains or reuses code that is licensed under the Apache 2.0 license from the following projects:
+- Google Cloud Client Library for Java (https://github.com/googleapis/google-cloud-java) Copyright 2017 Google LLC
+
+  See: flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java
+
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml
index 52b1146..6da5a42 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml
@@ -93,9 +93,9 @@ under the License.
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-surefire-plugin</artifactId>
-				<version>2.12.4</version>
 				<configuration>
 					<skipTests>${skipTests}</skipTests>
+					<forkCount>1</forkCount> <!-- Enforce single test execution -->
 				</configuration>
 				<executions>
 					<execution>
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
index 92a2bc8..5a8e433 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
@@ -50,10 +50,11 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
 	private static final String TOPIC_NAME = "Topic";
 	private static final String SUBSCRIPTION_NAME = "Subscription";
 
-	private static PubsubHelper pubsubHelper = getPubsubHelper();
+	private static PubsubHelper pubsubHelper;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
+		pubsubHelper = getPubsubHelper();
 		pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
 		pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
 	}
@@ -101,7 +102,7 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
 
 		LOG.info("Waiting a while to receive the message...");
 
-		waitUntill(() -> receivedMessages.size() > 0);
+		waitUntil(() -> receivedMessages.size() > 0);
 
 		assertEquals(1, receivedMessages.size());
 		assertEquals("Hello World", receivedMessages.get(0).getData().toStringUtf8());
@@ -109,7 +110,7 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
 		try {
 			subscriber.stopAsync().awaitTerminated(100, MILLISECONDS);
 		} catch (TimeoutException tme) {
-			// Yeah, whatever. Don't care about clean shutdown here.
+			LOG.info("Timeout during shutdown", tme);
 		}
 		publisher.shutdown();
 	}
@@ -117,14 +118,12 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
 	/*
 	 * Returns when predicate returns true or if 10 seconds have passed
 	 */
-	private void waitUntill(Supplier<Boolean> predicate) {
+	private void waitUntil(Supplier<Boolean> predicate) throws InterruptedException {
 		int retries = 0;
 
 		while (!predicate.get() && retries < 100) {
 			retries++;
-			try {
-				Thread.sleep(10);
-			} catch (InterruptedException e) { }
+			Thread.sleep(10);
 		}
 	}
 
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudEmulatorManager.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudEmulatorManager.java
index 7254f09..d4a3a38 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudEmulatorManager.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudEmulatorManager.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.gcp.pubsub.emulator;
 
 import com.spotify.docker.client.DefaultDockerClient;
 import com.spotify.docker.client.DockerClient;
+import com.spotify.docker.client.LogStream;
 import com.spotify.docker.client.exceptions.ContainerNotFoundException;
 import com.spotify.docker.client.exceptions.DockerCertificateException;
 import com.spotify.docker.client.exceptions.DockerException;
@@ -116,7 +117,7 @@ public class GCloudEmulatorManager {
 			.hostConfig(hostConfig)
 			.exposedPorts(INTERNAL_PUBSUB_PORT)
 			.image(DOCKER_IMAGE_NAME)
-			.cmd("sh", "-c", "mkdir -p /opt/data/pubsub ; gcloud beta emulators pubsub start --data-dir=/opt/data/pubsub  --host-port=0.0.0.0:" + INTERNAL_PUBSUB_PORT)
+			.cmd("sh", "-c", "mkdir -p /opt/data/pubsub ; gcloud beta emulators pubsub start --data-dir=/opt/data/pubsub --host-port=0.0.0.0:" + INTERNAL_PUBSUB_PORT)
 			.build();
 
 		final ContainerCreation creation = docker.createContainer(containerConfig, CONTAINER_NAME_JUNIT);
@@ -220,7 +221,7 @@ public class GCloudEmulatorManager {
 			containerInfo = docker.inspectContainer(CONTAINER_NAME_JUNIT);
 			// Already have this container running.
 
-			assertNotNull("We should either we get containerInfo or we get an exception", containerInfo);
+			assertNotNull("We should either get a containerInfo or we get an exception", containerInfo);
 
 			LOG.info("");
 			LOG.info("/===========================================");
@@ -228,7 +229,16 @@ public class GCloudEmulatorManager {
 				LOG.warn("|    >>> FOUND OLD EMULATOR INSTANCE RUNNING <<< ");
 				LOG.warn("| Destroying that one to keep tests running smoothly.");
 			}
-			LOG.info("| Cleanup of GCloud Emulator");
+			LOG.info("| Cleanup of GCloud Emulator. Log output of container: ");
+
+			if (LOG.isInfoEnabled()) {
+				try (LogStream stream = docker.logs(
+					containerInfo.id(),
+					DockerClient.LogsParam.stdout(),
+					DockerClient.LogsParam.stderr())) {
+					LOG.info("| > {}", stream.readFully());
+				}
+			}
 
 			// We REQUIRE 100% accurate side effect free unit tests
 			// So we completely discard this one.
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java
index 6fdc0117..e050323 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java
@@ -27,6 +27,7 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudEmulatorManager.getDockerIpAddress;
@@ -44,6 +45,9 @@ public class GCloudUnitTestBase implements Serializable {
 
 	@AfterClass
 	public static void terminateGCloudEmulator() throws DockerException, InterruptedException {
+		channel.shutdownNow();
+		channel.awaitTermination(1, TimeUnit.MINUTES);
+		channel = null;
 		GCloudEmulatorManager.terminateDocker();
 	}
 
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java
index bd2d2c5..5b51558 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java
@@ -153,7 +153,10 @@ public class PubsubHelper {
 		}
 	}
 
+	//
 	// Mostly copied from the example on https://cloud.google.com/pubsub/docs/pull
+	// Licensed under the Apache 2.0 License to "Google LLC" from https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java.
+	//
 	public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
 		SubscriberStubSettings subscriberStubSettings =
 			SubscriberStubSettings.newBuilder()
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/resources/log4j2-test.properties
index 835c2ec..6ecf8e1 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/resources/log4j2-test.properties
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/resources/log4j2-test.properties
@@ -16,9 +16,8 @@
 # limitations under the License.
 ################################################################################
 
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-rootLogger.level = OFF
+# Set logging level to INFO: Tests are executed as a Bash e2e test.
+rootLogger.level = INFO
 rootLogger.appenderRef.test.ref = TestLogger
 
 appender.testlogger.name = TestLogger
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_gcp_pubsub.sh b/flink-end-to-end-tests/test-scripts/test_streaming_gcp_pubsub.sh
index 8fa9e5a..625d818 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_gcp_pubsub.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_gcp_pubsub.sh
@@ -17,6 +17,8 @@
 # limitations under the License.
 ################################################################################
 
+# This test is a Java end to end test, but it requires Docker. Therefore, we run it from bash.
+
 cd "${END_TO_END_DIR}/flink-connector-gcp-pubsub-emulator-tests"
 
 run_mvn test -DskipTests=false