You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/07/02 16:31:37 UTC

[flink] 02/02: [FLINK-16572][e2e][pubsub] Acknowledge message in previous test

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7c440f39c2afb6d89ffd72d9901d999f1e11e553
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Mon Jun 8 11:40:43 2020 +0200

    [FLINK-16572][e2e][pubsub] Acknowledge message in previous test
    
    The test running before the failing test did not properly acknowledge the
    reception of the message.
    That's also the reason why this test always logged a timeout exception.
    
    With this change, the test will fail with timeout exceptions, and maybe this
    improves the overall test stability.
---
 .../connectors/gcp/pubsub/CheckPubSubEmulatorTest.java | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
index 5a8e433..532c2c1 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
@@ -33,10 +33,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -76,6 +75,7 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
 			.get();
 
 		List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 1);
+
 		assertEquals(1, receivedMessages.size());
 		assertEquals("Hello World PULL", receivedMessages.get(0).getMessage().getData().toStringUtf8());
 
@@ -89,8 +89,12 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
 			subscribeToSubscription(
 				PROJECT_NAME,
 				SUBSCRIPTION_NAME,
-				(message, consumer) -> receivedMessages.add(message)
+				(message, consumer) -> {
+					receivedMessages.add(message);
+					consumer.ack();
+				}
 			);
+		subscriber.awaitRunning(5, MINUTES);
 
 		Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
 		publisher
@@ -107,11 +111,9 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
 		assertEquals(1, receivedMessages.size());
 		assertEquals("Hello World", receivedMessages.get(0).getData().toStringUtf8());
 
-		try {
-			subscriber.stopAsync().awaitTerminated(100, MILLISECONDS);
-		} catch (TimeoutException tme) {
-			LOG.info("Timeout during shutdown", tme);
-		}
+		LOG.info("Received message. Shutting down ...");
+
+		subscriber.stopAsync().awaitTerminated(5, MINUTES);
 		publisher.shutdown();
 	}