You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/10/31 18:15:21 UTC
[hadoop] branch branch-3.3 updated: HADOOP-18233. Possible race condition with TemporaryAWSCredentialsProvider (#5024)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 1fdc6c5322d HADOOP-18233. Possible race condition with TemporaryAWSCredentialsProvider (#5024)
1fdc6c5322d is described below
commit 1fdc6c5322db5ab817325c470f6bcef240f78693
Author: sabertiger <sa...@users.noreply.github.com>
AuthorDate: Mon Oct 31 05:43:30 2022 -0700
HADOOP-18233. Possible race condition with TemporaryAWSCredentialsProvider (#5024)
This fixes a race condition with the TemporaryAWSCredentialProvider
one which has existed for a long time but which only surfaced
(usually in Spark) when the bucket existence probe was disabled
by setting fs.s3a.bucket.probe to 0 -a performance speedup
which was made the default in HADOOP-17454.
Contributed by Jimmy Wong.
---
.../auth/AbstractSessionCredentialsProvider.java | 16 +-
.../fs/s3a/TestS3AAWSCredentialsProvider.java | 165 ++++++++++++++++++++-
2 files changed, 171 insertions(+), 10 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java
index cab3aa99052..2cdf0880aff 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java
@@ -45,7 +45,7 @@ public abstract class AbstractSessionCredentialsProvider
extends AbstractAWSCredentialProvider {
/** Credentials, created in {@link #init()}. */
- private AWSCredentials awsCredentials;
+ private volatile AWSCredentials awsCredentials;
/** Atomic flag for on-demand initialization. */
private final AtomicBoolean initialized = new AtomicBoolean(false);
@@ -54,7 +54,7 @@ public abstract class AbstractSessionCredentialsProvider
* The (possibly translated) initialization exception.
* Used for testing.
*/
- private IOException initializationException;
+ private volatile IOException initializationException;
/**
* Constructor.
@@ -73,9 +73,9 @@ public abstract class AbstractSessionCredentialsProvider
* @throws IOException on any failure.
*/
@Retries.OnceTranslated
- protected void init() throws IOException {
+ protected synchronized void init() throws IOException {
// stop re-entrant attempts
- if (initialized.getAndSet(true)) {
+ if (isInitialized()) {
return;
}
try {
@@ -84,6 +84,8 @@ public abstract class AbstractSessionCredentialsProvider
} catch (IOException e) {
initializationException = e;
throw e;
+ } finally {
+ initialized.set(true);
}
}
@@ -132,13 +134,15 @@ public abstract class AbstractSessionCredentialsProvider
}
if (awsCredentials == null) {
throw new CredentialInitializationException(
- "Provider " + this + " has no credentials");
+ "Provider " + this + " has no credentials: " +
+ (initializationException != null ? initializationException.toString() : ""),
+ initializationException);
}
return awsCredentials;
}
public final boolean hasCredentials() {
- return awsCredentials == null;
+ return awsCredentials != null;
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
index 2c52bdc7605..730bae0aeb1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
@@ -22,9 +22,15 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.nio.file.AccessDeniedException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
@@ -37,6 +43,7 @@ import org.junit.rules.ExpectedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.auth.AbstractSessionCredentialsProvider;
import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider;
import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.io.retry.RetryPolicy;
@@ -46,6 +53,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import static org.junit.Assert.*;
/**
@@ -198,7 +206,7 @@ public class TestS3AAWSCredentialsProvider {
/**
* A credential provider whose constructor signature doesn't match.
*/
- static class ConstructorSignatureErrorProvider
+ protected static class ConstructorSignatureErrorProvider
implements AWSCredentialsProvider {
@SuppressWarnings("unused")
@@ -218,7 +226,7 @@ public class TestS3AAWSCredentialsProvider {
/**
* A credential provider whose constructor raises an NPE.
*/
- static class ConstructorFailureProvider
+ protected static class ConstructorFailureProvider
implements AWSCredentialsProvider {
@SuppressWarnings("unused")
@@ -246,7 +254,7 @@ public class TestS3AAWSCredentialsProvider {
}
}
- static class AWSExceptionRaisingFactory implements AWSCredentialsProvider {
+ protected static class AWSExceptionRaisingFactory implements AWSCredentialsProvider {
public static final String NO_AUTH = "No auth";
@@ -462,7 +470,7 @@ public class TestS3AAWSCredentialsProvider {
/**
* Credential provider which raises an IOE when constructed.
*/
- private static class IOERaisingProvider implements AWSCredentialsProvider {
+ protected static class IOERaisingProvider implements AWSCredentialsProvider {
public IOERaisingProvider(URI uri, Configuration conf)
throws IOException {
@@ -480,4 +488,153 @@ public class TestS3AAWSCredentialsProvider {
}
}
+ private static final AWSCredentials EXPECTED_CREDENTIALS = new AWSCredentials() {
+ @Override
+ public String getAWSAccessKeyId() {
+ return "expectedAccessKey";
+ }
+
+ @Override
+ public String getAWSSecretKey() {
+ return "expectedSecret";
+ }
+ };
+
+ /**
+ * Credential provider that takes a long time.
+ */
+ protected static class SlowProvider extends AbstractSessionCredentialsProvider {
+
+ public SlowProvider(@Nullable URI uri, Configuration conf) {
+ super(uri, conf);
+ }
+
+ @Override
+ protected AWSCredentials createCredentials(Configuration config) throws IOException {
+ // yield to other callers to induce race condition
+ Thread.yield();
+ return EXPECTED_CREDENTIALS;
+ }
+ }
+
+ private static final int CONCURRENT_THREADS = 10;
+
+ @Test
+ public void testConcurrentAuthentication() throws Throwable {
+ Configuration conf = createProviderConfiguration(SlowProvider.class.getName());
+ Path testFile = getCSVTestPath(conf);
+
+ AWSCredentialProviderList list = createAWSCredentialProviderSet(testFile.toUri(), conf);
+
+ SlowProvider provider = (SlowProvider) list.getProviders().get(0);
+
+ ExecutorService pool = Executors.newFixedThreadPool(CONCURRENT_THREADS);
+
+ List<Future<AWSCredentials>> results = new ArrayList<>();
+
+ try {
+ assertFalse(
+ "Provider not initialized. isInitialized should be false",
+ provider.isInitialized());
+ assertFalse(
+ "Provider not initialized. hasCredentials should be false",
+ provider.hasCredentials());
+ if (provider.getInitializationException() != null) {
+ throw new AssertionError(
+ "Provider not initialized. getInitializationException should return null",
+ provider.getInitializationException());
+ }
+
+ for (int i = 0; i < CONCURRENT_THREADS; i++) {
+ results.add(pool.submit(() -> list.getCredentials()));
+ }
+
+ for (Future<AWSCredentials> result : results) {
+ AWSCredentials credentials = result.get();
+ assertEquals("Access key from credential provider",
+ "expectedAccessKey", credentials.getAWSAccessKeyId());
+ assertEquals("Secret key from credential provider",
+ "expectedSecret", credentials.getAWSSecretKey());
+ }
+ } finally {
+ pool.awaitTermination(10, TimeUnit.SECONDS);
+ pool.shutdown();
+ }
+
+ assertTrue(
+ "Provider initialized without errors. isInitialized should be true",
+ provider.isInitialized());
+ assertTrue(
+ "Provider initialized without errors. hasCredentials should be true",
+ provider.hasCredentials());
+ if (provider.getInitializationException() != null) {
+ throw new AssertionError(
+ "Provider initialized without errors. getInitializationException should return null",
+ provider.getInitializationException());
+ }
+ }
+
+ /**
+ * Credential provider with error.
+ */
+ protected static class ErrorProvider extends AbstractSessionCredentialsProvider {
+
+ public ErrorProvider(@Nullable URI uri, Configuration conf) {
+ super(uri, conf);
+ }
+
+ @Override
+ protected AWSCredentials createCredentials(Configuration config) throws IOException {
+ throw new IOException("expected error");
+ }
+ }
+
+ @Test
+ public void testConcurrentAuthenticationError() throws Throwable {
+ Configuration conf = createProviderConfiguration(ErrorProvider.class.getName());
+ Path testFile = getCSVTestPath(conf);
+
+ AWSCredentialProviderList list = createAWSCredentialProviderSet(testFile.toUri(), conf);
+ ErrorProvider provider = (ErrorProvider) list.getProviders().get(0);
+
+ ExecutorService pool = Executors.newFixedThreadPool(CONCURRENT_THREADS);
+
+ List<Future<AWSCredentials>> results = new ArrayList<>();
+
+ try {
+ assertFalse("Provider not initialized. isInitialized should be false",
+ provider.isInitialized());
+ assertFalse("Provider not initialized. hasCredentials should be false",
+ provider.hasCredentials());
+ if (provider.getInitializationException() != null) {
+ throw new AssertionError(
+ "Provider not initialized. getInitializationException should return null",
+ provider.getInitializationException());
+ }
+
+ for (int i = 0; i < CONCURRENT_THREADS; i++) {
+ results.add(pool.submit(() -> list.getCredentials()));
+ }
+
+ for (Future<AWSCredentials> result : results) {
+ interceptFuture(CredentialInitializationException.class,
+ "expected error",
+ result
+ );
+ }
+ } finally {
+ pool.awaitTermination(10, TimeUnit.SECONDS);
+ pool.shutdown();
+ }
+
+ assertTrue(
+ "Provider initialization failed. isInitialized should be true",
+ provider.isInitialized());
+ assertFalse(
+ "Provider initialization failed. hasCredentials should be false",
+ provider.hasCredentials());
+ assertTrue(
+ "Provider initialization failed. getInitializationException should contain the error",
+ provider.getInitializationException().getMessage().contains("expected error"));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org