You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/10/31 18:20:59 UTC

[hadoop] branch branch-3.3.5 updated: HADOOP-18233. Possible race condition with TemporaryAWSCredentialsProvider (#5024)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
     new 88375fc3544 HADOOP-18233. Possible race condition with TemporaryAWSCredentialsProvider (#5024)
88375fc3544 is described below

commit 88375fc35447e1b12603117036ffc8908b115ae4
Author: sabertiger <sa...@users.noreply.github.com>
AuthorDate: Mon Oct 31 05:43:30 2022 -0700

    HADOOP-18233. Possible race condition with TemporaryAWSCredentialsProvider (#5024)
    
    This fixes a race condition with the TemporaryAWSCredentialProvider
    one which has existed for a long time but which only surfaced
    (usually in Spark) when the bucket existence probe was disabled
    by setting fs.s3a.bucket.probe to 0 -a performance speedup
    which was made the default in HADOOP-17454.
    
    Contributed by Jimmy Wong.
---
 .../auth/AbstractSessionCredentialsProvider.java   |  16 +-
 .../fs/s3a/TestS3AAWSCredentialsProvider.java      | 165 ++++++++++++++++++++-
 2 files changed, 171 insertions(+), 10 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java
index cab3aa99052..2cdf0880aff 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java
@@ -45,7 +45,7 @@ public abstract class AbstractSessionCredentialsProvider
     extends AbstractAWSCredentialProvider {
 
   /** Credentials, created in {@link #init()}. */
-  private AWSCredentials awsCredentials;
+  private volatile AWSCredentials awsCredentials;
 
   /** Atomic flag for on-demand initialization. */
   private final AtomicBoolean initialized = new AtomicBoolean(false);
@@ -54,7 +54,7 @@ public abstract class AbstractSessionCredentialsProvider
    * The (possibly translated) initialization exception.
    * Used for testing.
    */
-  private IOException initializationException;
+  private volatile IOException initializationException;
 
   /**
    * Constructor.
@@ -73,9 +73,9 @@ public abstract class AbstractSessionCredentialsProvider
    * @throws IOException on any failure.
    */
   @Retries.OnceTranslated
-  protected void init() throws IOException {
+  protected synchronized void init() throws IOException {
     // stop re-entrant attempts
-    if (initialized.getAndSet(true)) {
+    if (isInitialized()) {
       return;
     }
     try {
@@ -84,6 +84,8 @@ public abstract class AbstractSessionCredentialsProvider
     } catch (IOException e) {
       initializationException = e;
       throw e;
+    } finally {
+      initialized.set(true);
     }
   }
 
@@ -132,13 +134,15 @@ public abstract class AbstractSessionCredentialsProvider
     }
     if (awsCredentials == null) {
       throw new CredentialInitializationException(
-          "Provider " + this + " has no credentials");
+          "Provider " + this + " has no credentials: " +
+             (initializationException != null ? initializationException.toString() : ""),
+          initializationException);
     }
     return awsCredentials;
   }
 
   public final boolean hasCredentials() {
-    return awsCredentials == null;
+    return awsCredentials != null;
   }
 
   @Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
index 2c52bdc7605..730bae0aeb1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
@@ -22,9 +22,15 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.URI;
 import java.nio.file.AccessDeniedException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
@@ -37,6 +43,7 @@ import org.junit.rules.ExpectedException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.auth.AbstractSessionCredentialsProvider;
 import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider;
 import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -46,6 +53,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
 import static org.junit.Assert.*;
 
 /**
@@ -198,7 +206,7 @@ public class TestS3AAWSCredentialsProvider {
   /**
    * A credential provider whose constructor signature doesn't match.
    */
-  static class ConstructorSignatureErrorProvider
+  protected static class ConstructorSignatureErrorProvider
       implements AWSCredentialsProvider {
 
     @SuppressWarnings("unused")
@@ -218,7 +226,7 @@ public class TestS3AAWSCredentialsProvider {
   /**
    * A credential provider whose constructor raises an NPE.
    */
-  static class ConstructorFailureProvider
+  protected static class ConstructorFailureProvider
       implements AWSCredentialsProvider {
 
     @SuppressWarnings("unused")
@@ -246,7 +254,7 @@ public class TestS3AAWSCredentialsProvider {
     }
   }
 
-  static class AWSExceptionRaisingFactory implements AWSCredentialsProvider {
+  protected static class AWSExceptionRaisingFactory implements AWSCredentialsProvider {
 
     public static final String NO_AUTH = "No auth";
 
@@ -462,7 +470,7 @@ public class TestS3AAWSCredentialsProvider {
   /**
    * Credential provider which raises an IOE when constructed.
    */
-  private static class IOERaisingProvider implements AWSCredentialsProvider {
+  protected static class IOERaisingProvider implements AWSCredentialsProvider {
 
     public IOERaisingProvider(URI uri, Configuration conf)
         throws IOException {
@@ -480,4 +488,153 @@ public class TestS3AAWSCredentialsProvider {
     }
   }
 
+  private static final AWSCredentials EXPECTED_CREDENTIALS = new AWSCredentials() {
+    @Override
+    public String getAWSAccessKeyId() {
+      return "expectedAccessKey";
+    }
+
+    @Override
+    public String getAWSSecretKey() {
+      return "expectedSecret";
+    }
+  };
+
+  /**
+   * Credential provider that takes a long time.
+   */
+  protected static class SlowProvider extends AbstractSessionCredentialsProvider {
+
+    public SlowProvider(@Nullable URI uri, Configuration conf) {
+      super(uri, conf);
+    }
+
+    @Override
+    protected AWSCredentials createCredentials(Configuration config) throws IOException {
+      // yield to other callers to induce race condition
+      Thread.yield();
+      return EXPECTED_CREDENTIALS;
+    }
+  }
+
+  private static final int CONCURRENT_THREADS = 10;
+
+  @Test
+  public void testConcurrentAuthentication() throws Throwable {
+    Configuration conf = createProviderConfiguration(SlowProvider.class.getName());
+    Path testFile = getCSVTestPath(conf);
+
+    AWSCredentialProviderList list = createAWSCredentialProviderSet(testFile.toUri(), conf);
+
+    SlowProvider provider = (SlowProvider) list.getProviders().get(0);
+
+    ExecutorService pool = Executors.newFixedThreadPool(CONCURRENT_THREADS);
+
+    List<Future<AWSCredentials>> results = new ArrayList<>();
+
+    try {
+      assertFalse(
+          "Provider not initialized. isInitialized should be false",
+          provider.isInitialized());
+      assertFalse(
+          "Provider not initialized. hasCredentials should be false",
+          provider.hasCredentials());
+      if (provider.getInitializationException() != null) {
+        throw new AssertionError(
+            "Provider not initialized. getInitializationException should return null",
+            provider.getInitializationException());
+      }
+
+      for (int i = 0; i < CONCURRENT_THREADS; i++) {
+        results.add(pool.submit(() -> list.getCredentials()));
+      }
+
+      for (Future<AWSCredentials> result : results) {
+        AWSCredentials credentials = result.get();
+        assertEquals("Access key from credential provider",
+                "expectedAccessKey", credentials.getAWSAccessKeyId());
+        assertEquals("Secret key from credential provider",
+                "expectedSecret", credentials.getAWSSecretKey());
+      }
+    } finally {
+      pool.awaitTermination(10, TimeUnit.SECONDS);
+      pool.shutdown();
+    }
+
+    assertTrue(
+        "Provider initialized without errors. isInitialized should be true",
+         provider.isInitialized());
+    assertTrue(
+        "Provider initialized without errors. hasCredentials should be true",
+        provider.hasCredentials());
+    if (provider.getInitializationException() != null) {
+      throw new AssertionError(
+          "Provider initialized without errors. getInitializationException should return null",
+          provider.getInitializationException());
+    }
+  }
+
+  /**
+   * Credential provider with error.
+   */
+  protected static class ErrorProvider extends AbstractSessionCredentialsProvider {
+
+    public ErrorProvider(@Nullable URI uri, Configuration conf) {
+      super(uri, conf);
+    }
+
+    @Override
+    protected AWSCredentials createCredentials(Configuration config) throws IOException {
+      throw new IOException("expected error");
+    }
+  }
+
+  @Test
+  public void testConcurrentAuthenticationError() throws Throwable {
+    Configuration conf = createProviderConfiguration(ErrorProvider.class.getName());
+    Path testFile = getCSVTestPath(conf);
+
+    AWSCredentialProviderList list = createAWSCredentialProviderSet(testFile.toUri(), conf);
+    ErrorProvider provider = (ErrorProvider) list.getProviders().get(0);
+
+    ExecutorService pool = Executors.newFixedThreadPool(CONCURRENT_THREADS);
+
+    List<Future<AWSCredentials>> results = new ArrayList<>();
+
+    try {
+      assertFalse("Provider not initialized. isInitialized should be false",
+          provider.isInitialized());
+      assertFalse("Provider not initialized. hasCredentials should be false",
+          provider.hasCredentials());
+      if (provider.getInitializationException() != null) {
+        throw new AssertionError(
+            "Provider not initialized. getInitializationException should return null",
+            provider.getInitializationException());
+      }
+
+      for (int i = 0; i < CONCURRENT_THREADS; i++) {
+        results.add(pool.submit(() -> list.getCredentials()));
+      }
+
+      for (Future<AWSCredentials> result : results) {
+        interceptFuture(CredentialInitializationException.class,
+            "expected error",
+            result
+        );
+      }
+    } finally {
+      pool.awaitTermination(10, TimeUnit.SECONDS);
+      pool.shutdown();
+    }
+
+    assertTrue(
+        "Provider initialization failed. isInitialized should be true",
+        provider.isInitialized());
+    assertFalse(
+        "Provider initialization failed. hasCredentials should be false",
+        provider.hasCredentials());
+    assertTrue(
+        "Provider initialization failed. getInitializationException should contain the error",
+        provider.getInitializationException().getMessage().contains("expected error"));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org