You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/11/12 12:58:29 UTC
[flink] branch release-1.14 updated: [FLINK-24800][runtime] Changed
the assert condition for checking buffer timeout disabling test
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 39866ce [FLINK-24800][runtime] Changed the assert condition for checking buffer timeout disabling test
39866ce is described below
commit 39866ce3418974c9f9b746a3fb8b8e5c30a5f5db
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Nov 10 17:58:54 2021 +0100
[FLINK-24800][runtime] Changed the assert condition for checking buffer timeout disabling test
---
.../streaming/runtime/BufferTimeoutITCase.java | 24 ++++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
index 8597820..992acbb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
@@ -18,8 +18,8 @@
package org.apache.flink.test.streaming.runtime;
-import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -33,8 +33,7 @@ import org.junit.Test;
import java.util.ArrayList;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
/** Tests for {@link StreamExecutionEnvironment#setBufferTimeout(long)}. */
public class BufferTimeoutITCase extends AbstractTestBase {
@@ -42,9 +41,9 @@ public class BufferTimeoutITCase extends AbstractTestBase {
@Rule public final SharedObjects sharedObjects = SharedObjects.create();
/**
- * The test verifies that it is possible to disable buffer flushing. It emits a single record,
- * which should not fill an entire buffer, thus it should not never reach the sink. We check the
- * sink has not seen any records after 2 times the default buffer timeout.
+ * The test verifies that it is possible to disable explicit buffer flushing. It checks that
+ * OutputFlasher thread would not be started when the task is running. But this doesn't
+ * guarantee that the unfinished buffers can not be flushed by another events.
*/
@Test
public void testDisablingBufferTimeout() throws Exception {
@@ -71,7 +70,7 @@ public class BufferTimeoutITCase extends AbstractTestBase {
new SinkFunction<Integer>() {
@Override
- public void invoke(Integer value, Context context) throws Exception {
+ public void invoke(Integer value, Context context) {
results.get().add(value);
}
})
@@ -81,7 +80,14 @@ public class BufferTimeoutITCase extends AbstractTestBase {
CommonTestUtils.waitForAllTaskRunning(
miniClusterResource.getMiniCluster(), jobClient.getJobID(), false);
- Thread.sleep(2 * ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis());
- assertThat(results.get().size(), equalTo(0));
+ assertTrue(
+ RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " thread is unexpectedly running",
+ Thread.getAllStackTraces().keySet().stream()
+ .noneMatch(
+ thread ->
+ thread.getName()
+ .startsWith(
+ RecordWriter
+ .DEFAULT_OUTPUT_FLUSH_THREAD_NAME)));
}
}