You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/11/12 12:58:29 UTC

[flink] branch release-1.14 updated: [FLINK-24800][runtime] Changed the assert condition for checking buffer timeout disabling test

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 39866ce  [FLINK-24800][runtime] Changed the assert condition for checking buffer timeout disabling test
39866ce is described below

commit 39866ce3418974c9f9b746a3fb8b8e5c30a5f5db
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Nov 10 17:58:54 2021 +0100

    [FLINK-24800][runtime] Changed the assert condition for checking buffer timeout disabling test
---
 .../streaming/runtime/BufferTimeoutITCase.java     | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
index 8597820..992acbb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.test.streaming.runtime;
 
-import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -33,8 +33,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /** Tests for {@link StreamExecutionEnvironment#setBufferTimeout(long)}. */
 public class BufferTimeoutITCase extends AbstractTestBase {
@@ -42,9 +41,9 @@ public class BufferTimeoutITCase extends AbstractTestBase {
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
 
     /**
-     * The test verifies that it is possible to disable buffer flushing. It emits a single record,
-     * which should not fill an entire buffer, thus it should not never reach the sink. We check the
-     * sink has not seen any records after 2 times the default buffer timeout.
+     * The test verifies that it is possible to disable explicit buffer flushing. It checks that
+     * OutputFlasher thread would not be started when the task is running. But this doesn't
+     * guarantee that the unfinished buffers can not be flushed by another events.
      */
     @Test
     public void testDisablingBufferTimeout() throws Exception {
@@ -71,7 +70,7 @@ public class BufferTimeoutITCase extends AbstractTestBase {
                         new SinkFunction<Integer>() {
 
                             @Override
-                            public void invoke(Integer value, Context context) throws Exception {
+                            public void invoke(Integer value, Context context) {
                                 results.get().add(value);
                             }
                         })
@@ -81,7 +80,14 @@ public class BufferTimeoutITCase extends AbstractTestBase {
         CommonTestUtils.waitForAllTaskRunning(
                 miniClusterResource.getMiniCluster(), jobClient.getJobID(), false);
 
-        Thread.sleep(2 * ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis());
-        assertThat(results.get().size(), equalTo(0));
+        assertTrue(
+                RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " thread is unexpectedly running",
+                Thread.getAllStackTraces().keySet().stream()
+                        .noneMatch(
+                                thread ->
+                                        thread.getName()
+                                                .startsWith(
+                                                        RecordWriter
+                                                                .DEFAULT_OUTPUT_FLUSH_THREAD_NAME)));
     }
 }