You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2023/01/26 10:26:24 UTC
[flink] branch master updated: [FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues
This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3a64648b431 [FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues
3a64648b431 is described below
commit 3a64648b4316fb5283bade5132d8fba1c2211d9b
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Thu Jan 19 10:54:12 2023 +0100
[FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues
---
.../token/DefaultDelegationTokenManagerTest.java | 12 ++++----
.../DelegationTokenReceiverRepositoryTest.java | 4 +--
.../ExceptionThrowingDelegationTokenProvider.java | 32 ++++++++++++----------
.../ExceptionThrowingDelegationTokenReceiver.java | 21 ++++++++------
4 files changed, 38 insertions(+), 31 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
index e211389573b..b1b19109cbe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
@@ -84,7 +84,7 @@ public class DefaultDelegationTokenManagerTest {
assertThrows(
Exception.class,
() -> {
- ExceptionThrowingDelegationTokenProvider.throwInInit = true;
+ ExceptionThrowingDelegationTokenProvider.throwInInit.set(true);
new DefaultDelegationTokenManager(new Configuration(), null, null, null);
});
}
@@ -107,8 +107,8 @@ public class DefaultDelegationTokenManagerTest {
assertTrue(delegationTokenManager.isProviderLoaded("test"));
assertTrue(delegationTokenManager.isReceiverLoaded("test"));
- assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
- assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed);
+ assertTrue(ExceptionThrowingDelegationTokenProvider.constructed.get());
+ assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed.get());
assertFalse(delegationTokenManager.isProviderLoaded("throw"));
assertFalse(delegationTokenManager.isReceiverLoaded("throw"));
}
@@ -169,7 +169,7 @@ public class DefaultDelegationTokenManagerTest {
final ManuallyTriggeredScheduledExecutorService scheduler =
new ManuallyTriggeredScheduledExecutorService();
- ExceptionThrowingDelegationTokenProvider.addToken = true;
+ ExceptionThrowingDelegationTokenProvider.addToken.set(true);
Configuration configuration = new Configuration();
configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", true);
AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
@@ -184,10 +184,10 @@ public class DefaultDelegationTokenManagerTest {
};
delegationTokenManager.startTokensUpdate();
- ExceptionThrowingDelegationTokenProvider.throwInUsage = true;
+ ExceptionThrowingDelegationTokenProvider.throwInUsage.set(true);
scheduledExecutor.triggerScheduledTasks();
scheduler.triggerAll();
- ExceptionThrowingDelegationTokenProvider.throwInUsage = false;
+ ExceptionThrowingDelegationTokenProvider.throwInUsage.set(false);
scheduledExecutor.triggerScheduledTasks();
scheduler.triggerAll();
delegationTokenManager.stopTokensUpdate();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
index b1605226bd8..4b997fa81e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
@@ -53,7 +53,7 @@ class DelegationTokenReceiverRepositoryTest {
assertThrows(
Exception.class,
() -> {
- ExceptionThrowingDelegationTokenReceiver.throwInInit = true;
+ ExceptionThrowingDelegationTokenReceiver.throwInInit.set(true);
new DelegationTokenReceiverRepository(new Configuration(), null);
});
}
@@ -69,7 +69,7 @@ class DelegationTokenReceiverRepositoryTest {
assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hadoopfs"));
assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hbase"));
assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("test"));
- assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed);
+ assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed.get());
assertFalse(delegationTokenReceiverRepository.isReceiverLoaded("throw"));
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
index 93a78420056..3a4300423c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
@@ -28,20 +28,24 @@ import java.util.Optional;
*/
public class ExceptionThrowingDelegationTokenProvider implements DelegationTokenProvider {
- public static volatile boolean throwInInit = false;
- public static volatile boolean throwInUsage = false;
- public static volatile boolean addToken = false;
- public static volatile boolean constructed = false;
+ public static volatile ThreadLocal<Boolean> throwInInit =
+ ThreadLocal.withInitial(() -> Boolean.FALSE);
+ public static volatile ThreadLocal<Boolean> throwInUsage =
+ ThreadLocal.withInitial(() -> Boolean.FALSE);
+ public static volatile ThreadLocal<Boolean> addToken =
+ ThreadLocal.withInitial(() -> Boolean.FALSE);
+ public static volatile ThreadLocal<Boolean> constructed =
+ ThreadLocal.withInitial(() -> Boolean.FALSE);
public static void reset() {
- throwInInit = false;
- throwInUsage = false;
- addToken = false;
- constructed = false;
+ throwInInit.set(false);
+ throwInUsage.set(false);
+ addToken.set(false);
+ constructed.set(false);
}
public ExceptionThrowingDelegationTokenProvider() {
- constructed = true;
+ constructed.set(true);
}
@Override
@@ -51,25 +55,25 @@ public class ExceptionThrowingDelegationTokenProvider implements DelegationToken
@Override
public void init(Configuration configuration) {
- if (throwInInit) {
+ if (throwInInit.get()) {
throw new IllegalArgumentException();
}
}
@Override
public boolean delegationTokensRequired() {
- if (throwInUsage) {
+ if (throwInUsage.get()) {
throw new IllegalArgumentException();
}
- return addToken;
+ return addToken.get();
}
@Override
public ObtainedDelegationTokens obtainDelegationTokens() {
- if (throwInUsage) {
+ if (throwInUsage.get()) {
throw new IllegalArgumentException();
}
- if (addToken) {
+ if (addToken.get()) {
return new ObtainedDelegationTokens("TEST_TOKEN_VALUE".getBytes(), Optional.empty());
} else {
return null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
index f1d919f34dc..bf0329f9595 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
@@ -26,18 +26,21 @@ import org.apache.flink.core.security.token.DelegationTokenReceiver;
*/
public class ExceptionThrowingDelegationTokenReceiver implements DelegationTokenReceiver {
- public static volatile boolean throwInInit = false;
- public static volatile boolean throwInUsage = false;
- public static volatile boolean constructed = false;
+ public static volatile ThreadLocal<Boolean> throwInInit =
+ ThreadLocal.withInitial(() -> Boolean.FALSE);
+ public static volatile ThreadLocal<Boolean> throwInUsage =
+ ThreadLocal.withInitial(() -> Boolean.FALSE);
+ public static volatile ThreadLocal<Boolean> constructed =
+ ThreadLocal.withInitial(() -> Boolean.FALSE);
public static void reset() {
- throwInInit = false;
- throwInUsage = false;
- constructed = false;
+ throwInInit.set(false);
+ throwInUsage.set(false);
+ constructed.set(false);
}
public ExceptionThrowingDelegationTokenReceiver() {
- constructed = true;
+ constructed.set(true);
}
@Override
@@ -47,14 +50,14 @@ public class ExceptionThrowingDelegationTokenReceiver implements DelegationToken
@Override
public void init(Configuration configuration) {
- if (throwInInit) {
+ if (throwInInit.get()) {
throw new IllegalArgumentException();
}
}
@Override
public void onNewTokensObtained(byte[] tokens) throws Exception {
- if (throwInUsage) {
+ if (throwInUsage.get()) {
throw new IllegalArgumentException();
}
}