You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2023/01/26 10:26:24 UTC

[flink] branch master updated: [FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues

This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a64648b431 [FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues
3a64648b431 is described below

commit 3a64648b4316fb5283bade5132d8fba1c2211d9b
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Thu Jan 19 10:54:12 2023 +0100

    [FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues
---
 .../token/DefaultDelegationTokenManagerTest.java   | 12 ++++----
 .../DelegationTokenReceiverRepositoryTest.java     |  4 +--
 .../ExceptionThrowingDelegationTokenProvider.java  | 32 ++++++++++++----------
 .../ExceptionThrowingDelegationTokenReceiver.java  | 21 ++++++++------
 4 files changed, 38 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
index e211389573b..b1b19109cbe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
@@ -84,7 +84,7 @@ public class DefaultDelegationTokenManagerTest {
         assertThrows(
                 Exception.class,
                 () -> {
-                    ExceptionThrowingDelegationTokenProvider.throwInInit = true;
+                    ExceptionThrowingDelegationTokenProvider.throwInInit.set(true);
                     new DefaultDelegationTokenManager(new Configuration(), null, null, null);
                 });
     }
@@ -107,8 +107,8 @@ public class DefaultDelegationTokenManagerTest {
         assertTrue(delegationTokenManager.isProviderLoaded("test"));
         assertTrue(delegationTokenManager.isReceiverLoaded("test"));
 
-        assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
-        assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed);
+        assertTrue(ExceptionThrowingDelegationTokenProvider.constructed.get());
+        assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed.get());
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
         assertFalse(delegationTokenManager.isReceiverLoaded("throw"));
     }
@@ -169,7 +169,7 @@ public class DefaultDelegationTokenManagerTest {
         final ManuallyTriggeredScheduledExecutorService scheduler =
                 new ManuallyTriggeredScheduledExecutorService();
 
-        ExceptionThrowingDelegationTokenProvider.addToken = true;
+        ExceptionThrowingDelegationTokenProvider.addToken.set(true);
         Configuration configuration = new Configuration();
         configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", true);
         AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
@@ -184,10 +184,10 @@ public class DefaultDelegationTokenManagerTest {
                 };
 
         delegationTokenManager.startTokensUpdate();
-        ExceptionThrowingDelegationTokenProvider.throwInUsage = true;
+        ExceptionThrowingDelegationTokenProvider.throwInUsage.set(true);
         scheduledExecutor.triggerScheduledTasks();
         scheduler.triggerAll();
-        ExceptionThrowingDelegationTokenProvider.throwInUsage = false;
+        ExceptionThrowingDelegationTokenProvider.throwInUsage.set(false);
         scheduledExecutor.triggerScheduledTasks();
         scheduler.triggerAll();
         delegationTokenManager.stopTokensUpdate();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
index b1605226bd8..4b997fa81e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
@@ -53,7 +53,7 @@ class DelegationTokenReceiverRepositoryTest {
         assertThrows(
                 Exception.class,
                 () -> {
-                    ExceptionThrowingDelegationTokenReceiver.throwInInit = true;
+                    ExceptionThrowingDelegationTokenReceiver.throwInInit.set(true);
                     new DelegationTokenReceiverRepository(new Configuration(), null);
                 });
     }
@@ -69,7 +69,7 @@ class DelegationTokenReceiverRepositoryTest {
         assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hadoopfs"));
         assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hbase"));
         assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("test"));
-        assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed);
+        assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed.get());
         assertFalse(delegationTokenReceiverRepository.isReceiverLoaded("throw"));
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
index 93a78420056..3a4300423c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
@@ -28,20 +28,24 @@ import java.util.Optional;
  */
 public class ExceptionThrowingDelegationTokenProvider implements DelegationTokenProvider {
 
-    public static volatile boolean throwInInit = false;
-    public static volatile boolean throwInUsage = false;
-    public static volatile boolean addToken = false;
-    public static volatile boolean constructed = false;
+    public static volatile ThreadLocal<Boolean> throwInInit =
+            ThreadLocal.withInitial(() -> Boolean.FALSE);
+    public static volatile ThreadLocal<Boolean> throwInUsage =
+            ThreadLocal.withInitial(() -> Boolean.FALSE);
+    public static volatile ThreadLocal<Boolean> addToken =
+            ThreadLocal.withInitial(() -> Boolean.FALSE);
+    public static volatile ThreadLocal<Boolean> constructed =
+            ThreadLocal.withInitial(() -> Boolean.FALSE);
 
     public static void reset() {
-        throwInInit = false;
-        throwInUsage = false;
-        addToken = false;
-        constructed = false;
+        throwInInit.set(false);
+        throwInUsage.set(false);
+        addToken.set(false);
+        constructed.set(false);
     }
 
     public ExceptionThrowingDelegationTokenProvider() {
-        constructed = true;
+        constructed.set(true);
     }
 
     @Override
@@ -51,25 +55,25 @@ public class ExceptionThrowingDelegationTokenProvider implements DelegationToken
 
     @Override
     public void init(Configuration configuration) {
-        if (throwInInit) {
+        if (throwInInit.get()) {
             throw new IllegalArgumentException();
         }
     }
 
     @Override
     public boolean delegationTokensRequired() {
-        if (throwInUsage) {
+        if (throwInUsage.get()) {
             throw new IllegalArgumentException();
         }
-        return addToken;
+        return addToken.get();
     }
 
     @Override
     public ObtainedDelegationTokens obtainDelegationTokens() {
-        if (throwInUsage) {
+        if (throwInUsage.get()) {
             throw new IllegalArgumentException();
         }
-        if (addToken) {
+        if (addToken.get()) {
             return new ObtainedDelegationTokens("TEST_TOKEN_VALUE".getBytes(), Optional.empty());
         } else {
             return null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
index f1d919f34dc..bf0329f9595 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
@@ -26,18 +26,21 @@ import org.apache.flink.core.security.token.DelegationTokenReceiver;
  */
 public class ExceptionThrowingDelegationTokenReceiver implements DelegationTokenReceiver {
 
-    public static volatile boolean throwInInit = false;
-    public static volatile boolean throwInUsage = false;
-    public static volatile boolean constructed = false;
+    public static volatile ThreadLocal<Boolean> throwInInit =
+            ThreadLocal.withInitial(() -> Boolean.FALSE);
+    public static volatile ThreadLocal<Boolean> throwInUsage =
+            ThreadLocal.withInitial(() -> Boolean.FALSE);
+    public static volatile ThreadLocal<Boolean> constructed =
+            ThreadLocal.withInitial(() -> Boolean.FALSE);
 
     public static void reset() {
-        throwInInit = false;
-        throwInUsage = false;
-        constructed = false;
+        throwInInit.set(false);
+        throwInUsage.set(false);
+        constructed.set(false);
     }
 
     public ExceptionThrowingDelegationTokenReceiver() {
-        constructed = true;
+        constructed.set(true);
     }
 
     @Override
@@ -47,14 +50,14 @@ public class ExceptionThrowingDelegationTokenReceiver implements DelegationToken
 
     @Override
     public void init(Configuration configuration) {
-        if (throwInInit) {
+        if (throwInInit.get()) {
             throw new IllegalArgumentException();
         }
     }
 
     @Override
     public void onNewTokensObtained(byte[] tokens) throws Exception {
-        if (throwInUsage) {
+        if (throwInUsage.get()) {
             throw new IllegalArgumentException();
         }
     }