You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/06 07:11:51 UTC

[flink] 02/02: [FLINK-26979][core][tests] Add thread-safety test

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 342dfd16e575ee23ef6058869a93b44deb0b4681
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Jul 5 10:46:40 2022 +0200

    [FLINK-26979][core][tests] Add thread-safety test
---
 .../apache/flink/api/dag/TransformationTest.java   | 45 ++++++++++++++++++++++
 1 file changed, 45 insertions(+)

diff --git a/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java b/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java
index 494fc9d3465..c14ab0d8e3d 100644
--- a/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java
@@ -20,16 +20,22 @@ package org.apache.flink.api.dag;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /** Tests for {@link Transformation}. */
@@ -42,6 +48,45 @@ public class TransformationTest extends TestLogger {
         transformation = new TestTransformation<>("t", null, 1);
     }
 
+    @Test
+    public void testGetNewNodeIdIsThreadSafe() throws Exception {
+        final int numThreads = 10;
+        final int numIdsPerThread = 100;
+
+        final List<CheckedThread> threads = new ArrayList<>();
+
+        final OneShotLatch startLatch = new OneShotLatch();
+
+        final List<List<Integer>> idLists = Collections.synchronizedList(new ArrayList<>());
+        for (int x = 0; x < numThreads; x++) {
+            threads.add(
+                    new CheckedThread() {
+                        @Override
+                        public void go() throws Exception {
+                            startLatch.await();
+
+                            final List<Integer> ids = new ArrayList<>();
+                            for (int c = 0; c < numIdsPerThread; c++) {
+                                ids.add(Transformation.getNewNodeId());
+                            }
+                            idLists.add(ids);
+                        }
+                    });
+        }
+        threads.forEach(Thread::start);
+
+        startLatch.trigger();
+
+        for (CheckedThread thread : threads) {
+            thread.sync();
+        }
+
+        final Set<Integer> deduplicatedIds =
+                idLists.stream().flatMap(List::stream).collect(Collectors.toSet());
+
+        assertEquals(numThreads * numIdsPerThread, deduplicatedIds.size());
+    }
+
     @Test
     public void testDeclareManagedMemoryUseCase() {
         transformation.declareManagedMemoryUseCaseAtOperatorScope(