You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/07/02 16:31:37 UTC
[flink] 02/02: [FLINK-16572][e2e][pubsub] Acknowledge message in
previous test
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7c440f39c2afb6d89ffd72d9901d999f1e11e553
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Mon Jun 8 11:40:43 2020 +0200
[FLINK-16572][e2e][pubsub] Acknowledge message in previous test
The test running before the failing test did not properly acknowledge the
reception of the message.
That's also the reason why this test always logged a timeout exception.
With this change, the test will fail with timeout exceptions, and maybe this
improves the overall test stability.
---
.../connectors/gcp/pubsub/CheckPubSubEmulatorTest.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
index 5a8e433..532c2c1 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
@@ -33,10 +33,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static org.junit.Assert.assertEquals;
/**
@@ -76,6 +75,7 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
.get();
List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 1);
+
assertEquals(1, receivedMessages.size());
assertEquals("Hello World PULL", receivedMessages.get(0).getMessage().getData().toStringUtf8());
@@ -89,8 +89,12 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
subscribeToSubscription(
PROJECT_NAME,
SUBSCRIPTION_NAME,
- (message, consumer) -> receivedMessages.add(message)
+ (message, consumer) -> {
+ receivedMessages.add(message);
+ consumer.ack();
+ }
);
+ subscriber.awaitRunning(5, MINUTES);
Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
publisher
@@ -107,11 +111,9 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
assertEquals(1, receivedMessages.size());
assertEquals("Hello World", receivedMessages.get(0).getData().toStringUtf8());
- try {
- subscriber.stopAsync().awaitTerminated(100, MILLISECONDS);
- } catch (TimeoutException tme) {
- LOG.info("Timeout during shutdown", tme);
- }
+ LOG.info("Received message. Shutting down ...");
+
+ subscriber.stopAsync().awaitTerminated(5, MINUTES);
publisher.shutdown();
}