You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/06 07:11:51 UTC
[flink] 02/02: [FLINK-26979][core][tests] Add thread-safety test
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 342dfd16e575ee23ef6058869a93b44deb0b4681
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Jul 5 10:46:40 2022 +0200
[FLINK-26979][core][tests] Add thread-safety test
---
.../apache/flink/api/dag/TransformationTest.java | 45 ++++++++++++++++++++++
1 file changed, 45 insertions(+)
diff --git a/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java b/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java
index 494fc9d3465..c14ab0d8e3d 100644
--- a/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java
@@ -20,16 +20,22 @@ package org.apache.flink.api.dag;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/** Tests for {@link Transformation}. */
@@ -42,6 +48,45 @@ public class TransformationTest extends TestLogger {
transformation = new TestTransformation<>("t", null, 1);
}
+ @Test
+ public void testGetNewNodeIdIsThreadSafe() throws Exception {
+ final int numThreads = 10;
+ final int numIdsPerThread = 100;
+
+ final List<CheckedThread> threads = new ArrayList<>();
+
+ final OneShotLatch startLatch = new OneShotLatch();
+
+ final List<List<Integer>> idLists = Collections.synchronizedList(new ArrayList<>());
+ for (int x = 0; x < numThreads; x++) {
+ threads.add(
+ new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ startLatch.await();
+
+ final List<Integer> ids = new ArrayList<>();
+ for (int c = 0; c < numIdsPerThread; c++) {
+ ids.add(Transformation.getNewNodeId());
+ }
+ idLists.add(ids);
+ }
+ });
+ }
+ threads.forEach(Thread::start);
+
+ startLatch.trigger();
+
+ for (CheckedThread thread : threads) {
+ thread.sync();
+ }
+
+ final Set<Integer> deduplicatedIds =
+ idLists.stream().flatMap(List::stream).collect(Collectors.toSet());
+
+ assertEquals(numThreads * numIdsPerThread, deduplicatedIds.size());
+ }
+
@Test
public void testDeclareManagedMemoryUseCase() {
transformation.declareManagedMemoryUseCaseAtOperatorScope(