You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2020/04/07 03:10:32 UTC

[hadoop] branch branch-2 updated: HADOOP-16933. Backport HADOOP-16890 and HADOOP-16825 to branch-2

This is an automated email from the ASF dual-hosted git repository.

dazhou pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 018c722  HADOOP-16933. Backport HADOOP-16890 and HADOOP-16825 to branch-2
018c722 is described below

commit 018c72280a60b203ed01d0469ada42026fb97a7f
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Tue Apr 7 08:40:14 2020 +0530

    HADOOP-16933. Backport HADOOP-16890 and HADOOP-16825 to branch-2
    
    Hadoop 16890. Change in expiry calculation for MSI token provider. Contributed by Bilahari T H
    (cherry picked from commit 0b931f36ec83dc72729a9e84a0d313f471061c64)
    
    HADOOP-16825: ITestAzureBlobFileSystemCheckAccess failing. Contributed by Bilahari T H
    (cherry picked from commit 5944d28130925fe1452f545e96b5e44f064bc69e)
---
 hadoop-project/pom.xml                             |  5 ++
 hadoop-tools/hadoop-azure/pom.xml                  |  6 ++
 .../fs/azurebfs/oauth2/AccessTokenProvider.java    |  2 +-
 .../fs/azurebfs/oauth2/AzureADAuthenticator.java   | 57 +++++++++----
 .../fs/azurebfs/oauth2/MsiTokenProvider.java       | 34 ++++++++
 .../fs/azurebfs/ITestAbfsMsiTokenProvider.java     | 93 ++++++++++++++++++++++
 .../ITestAzureBlobFileSystemCheckAccess.java       | 21 ++++-
 7 files changed, 199 insertions(+), 19 deletions(-)

diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 7aaf826..1f6a7dd 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -1314,6 +1314,11 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.hamcrest</groupId>
+        <artifactId>hamcrest-library</artifactId>
+        <version>1.3</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index 245c883..f4fc358 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -249,6 +249,12 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <profiles>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
index 72f37a1..a20e6df 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
@@ -72,7 +72,7 @@ public abstract class AccessTokenProvider {
    *
    * @return true if the token is expiring in next 5 minutes
    */
-  private boolean isTokenAboutToExpire() {
+  protected boolean isTokenAboutToExpire() {
     if (token == null) {
       LOG.debug("AADToken: no token. Returning expiring=true");
       return true;   // no token should have same response as expired token
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
index 0eb379c..03be1c0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
@@ -136,7 +136,7 @@ public final class AzureADAuthenticator {
     headers.put("Metadata", "true");
 
     LOG.debug("AADToken: starting to fetch token using MSI");
-    return getTokenCall(authEndpoint, qp.serialize(), headers, "GET");
+    return getTokenCall(authEndpoint, qp.serialize(), headers, "GET", true);
   }
 
   /**
@@ -197,8 +197,13 @@ public final class AzureADAuthenticator {
   }
 
   private static AzureADToken getTokenCall(String authEndpoint, String body,
-                                           Hashtable<String, String> headers, String httpMethod)
-          throws IOException {
+      Hashtable<String, String> headers, String httpMethod) throws IOException {
+    return getTokenCall(authEndpoint, body, headers, httpMethod, false);
+  }
+
+  private static AzureADToken getTokenCall(String authEndpoint, String body,
+      Hashtable<String, String> headers, String httpMethod, boolean isMsi)
+      throws IOException {
     AzureADToken token = null;
     ExponentialRetryPolicy retryPolicy
             = new ExponentialRetryPolicy(3, 0, 1000, 2);
@@ -211,7 +216,7 @@ public final class AzureADAuthenticator {
       httperror = 0;
       ex = null;
       try {
-        token = getTokenSingleCall(authEndpoint, body, headers, httpMethod);
+        token = getTokenSingleCall(authEndpoint, body, headers, httpMethod, isMsi);
       } catch (HttpException e) {
         httperror = e.httpErrorCode;
         ex = e;
@@ -227,8 +232,9 @@ public final class AzureADAuthenticator {
     return token;
   }
 
-  private static AzureADToken getTokenSingleCall(
-          String authEndpoint, String payload, Hashtable<String, String> headers, String httpMethod)
+  private static AzureADToken getTokenSingleCall(String authEndpoint,
+      String payload, Hashtable<String, String> headers, String httpMethod,
+      boolean isMsi)
           throws IOException {
 
     AzureADToken token = null;
@@ -268,7 +274,7 @@ public final class AzureADAuthenticator {
       if (httpResponseCode == HttpURLConnection.HTTP_OK
               && responseContentType.startsWith("application/json") && responseContentLength > 0) {
         InputStream httpResponseStream = conn.getInputStream();
-        token = parseTokenFromStream(httpResponseStream);
+        token = parseTokenFromStream(httpResponseStream, isMsi);
       } else {
         String responseBody = consumeInputStream(conn.getErrorStream(), 1024);
         String proxies = "none";
@@ -296,10 +302,12 @@ public final class AzureADAuthenticator {
     return token;
   }
 
-  private static AzureADToken parseTokenFromStream(InputStream httpResponseStream) throws IOException {
+  private static AzureADToken parseTokenFromStream(
+      InputStream httpResponseStream, boolean isMsi) throws IOException {
     AzureADToken token = new AzureADToken();
     try {
-      int expiryPeriod = 0;
+      int expiryPeriodInSecs = 0;
+      long expiresOnInSecs = -1;
 
       JsonFactory jf = new JsonFactory();
       JsonParser jp = jf.createJsonParser(httpResponseStream);
@@ -314,17 +322,38 @@ public final class AzureADAuthenticator {
           if (fieldName.equals("access_token")) {
             token.setAccessToken(fieldValue);
           }
+
           if (fieldName.equals("expires_in")) {
-            expiryPeriod = Integer.parseInt(fieldValue);
+            expiryPeriodInSecs = Integer.parseInt(fieldValue);
+          }
+
+          if (fieldName.equals("expires_on")) {
+            expiresOnInSecs = Long.parseLong(fieldValue);
           }
+
         }
         jp.nextToken();
       }
       jp.close();
-      long expiry = System.currentTimeMillis();
-      expiry = expiry + expiryPeriod * 1000L; // convert expiryPeriod to milliseconds and add
-      token.setExpiry(new Date(expiry));
-      LOG.debug("AADToken: fetched token with expiry " + token.getExpiry().toString());
+      if (expiresOnInSecs > 0) {
+        LOG.debug("Expiry based on expires_on: {}", expiresOnInSecs);
+        token.setExpiry(new Date(expiresOnInSecs * 1000));
+      } else {
+        if (isMsi) {
+          // Currently there is a known issue that MSI does not update expires_in
+          // for refresh and will have the value from first AAD token fetch request.
+          // Due to this known limitation, expires_in is not supported for MSI token fetch flow.
+          throw new UnsupportedOperationException("MSI Responded with invalid expires_on");
+        }
+
+        LOG.debug("Expiry based on expires_in: {}", expiryPeriodInSecs);
+        long expiry = System.currentTimeMillis();
+        expiry = expiry + expiryPeriodInSecs * 1000L; // convert expiryPeriod to milliseconds and add
+        token.setExpiry(new Date(expiry));
+      }
+
+      LOG.debug("AADToken: fetched token with expiry {}, expiresOn passed: {}",
+          token.getExpiry().toString(), expiresOnInSecs);
     } catch (Exception ex) {
       LOG.debug("AADToken: got exception when parsing json token " + ex.toString());
       throw ex;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
index 38f3045..784365b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
@@ -36,6 +36,10 @@ public class MsiTokenProvider extends AccessTokenProvider {
 
   private final String clientId;
 
+  private long tokenFetchTime = -1;
+
+  private static final long ONE_HOUR = 3600 * 1000;
+
   private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
 
   public MsiTokenProvider(final String authEndpoint, final String tenantGuid,
@@ -51,6 +55,36 @@ public class MsiTokenProvider extends AccessTokenProvider {
     LOG.debug("AADToken: refreshing token from MSI");
     AzureADToken token = AzureADAuthenticator
         .getTokenFromMsi(authEndpoint, tenantGuid, clientId, authority, false);
+    tokenFetchTime = System.currentTimeMillis();
     return token;
   }
+
+  /**
+   * Checks if the token is about to expire as per base expiry logic.
+   * Otherwise try to expire every 1 hour
+   *
+   * @return true if the token is expiring in next 1 hour or if a token has
+   * never been fetched
+   */
+  @Override
+  protected boolean isTokenAboutToExpire() {
+    if (tokenFetchTime == -1 || super.isTokenAboutToExpire()) {
+      return true;
+    }
+
+    boolean expiring = false;
+    long elapsedTimeSinceLastTokenRefreshInMillis =
+        System.currentTimeMillis() - tokenFetchTime;
+    expiring = elapsedTimeSinceLastTokenRefreshInMillis >= ONE_HOUR
+        || elapsedTimeSinceLastTokenRefreshInMillis < 0;
+    // In case of, Token is not refreshed for 1 hr or any clock skew issues,
+    // refresh token.
+    if (expiring) {
+      LOG.debug("MSIToken: token renewing. Time elapsed since last token fetch:"
+          + " {} milli seconds", elapsedTimeSinceLastTokenRefreshInMillis);
+    }
+
+    return expiring;
+  }
+
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java
new file mode 100644
index 0000000..d871bef
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.junit.Test;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
+import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider;
+
+import static org.junit.Assume.assumeThat;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.isEmptyOrNullString;
+import static org.hamcrest.Matchers.isEmptyString;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY;
+import static org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT;
+
+/**
+ * Test MsiTokenProvider.
+ */
+public final class ITestAbfsMsiTokenProvider
+    extends AbstractAbfsIntegrationTest {
+
+  public ITestAbfsMsiTokenProvider() throws Exception {
+    super();
+  }
+
+  @Test
+  public void test() throws IOException {
+    AbfsConfiguration conf = getConfiguration();
+    assumeThat(conf.get(FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT),
+        not(isEmptyOrNullString()));
+    assumeThat(conf.get(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT),
+        not(isEmptyOrNullString()));
+    assumeThat(conf.get(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID),
+        not(isEmptyOrNullString()));
+    assumeThat(conf.get(FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY),
+        not(isEmptyOrNullString()));
+
+    String tenantGuid = conf
+        .getPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT);
+    String clientId = conf.getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
+    String authEndpoint = getTrimmedPasswordString(conf,
+        FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT,
+        DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT);
+    String authority = getTrimmedPasswordString(conf,
+        FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY,
+        DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY);
+    AccessTokenProvider tokenProvider = new MsiTokenProvider(authEndpoint,
+        tenantGuid, clientId, authority);
+
+    AzureADToken token = null;
+    token = tokenProvider.getToken();
+    assertThat(token.getAccessToken(), not(isEmptyString()));
+    assertThat(token.getExpiry().after(new Date()), is(true));
+  }
+
+  private String getTrimmedPasswordString(AbfsConfiguration conf, String key,
+      String defaultValue) throws IOException {
+    String value = conf.getPasswordString(key);
+    if (StringUtils.isBlank(value)) {
+      value = defaultValue;
+    }
+    return value.trim();
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java
index d1f9ec7..393eff4 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java
@@ -54,7 +54,7 @@ public class ITestAzureBlobFileSystemCheckAccess
 
   private static final String TEST_FOLDER_PATH = "CheckAccessTestFolder";
   private final FileSystem superUserFs;
-  private final FileSystem testUserFs;
+  private FileSystem testUserFs;
   private final String testUserGuid;
   private final boolean isCheckAccessEnabled;
   private final boolean isHNSEnabled;
@@ -64,13 +64,15 @@ public class ITestAzureBlobFileSystemCheckAccess
     this.superUserFs = getFileSystem();
     testUserGuid = getConfiguration()
         .get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID);
-    this.testUserFs = getTestUserFs();
     this.isCheckAccessEnabled = getConfiguration().isCheckAccessEnabled();
     this.isHNSEnabled = getConfiguration()
         .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
   }
 
-  private FileSystem getTestUserFs() throws Exception {
+  private void setTestUserFs() throws Exception {
+    if (this.testUserFs != null) {
+      return;
+    }
     String orgClientId = getConfiguration().get(FS_AZURE_BLOB_FS_CLIENT_ID);
     String orgClientSecret = getConfiguration()
         .get(FS_AZURE_BLOB_FS_CLIENT_SECRET);
@@ -89,7 +91,7 @@ public class ITestAzureBlobFileSystemCheckAccess
     getRawConfiguration()
         .setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
             orgCreateFileSystemDurungInit);
-    return fs;
+    this.testUserFs = fs;
   }
 
   @Test(expected = IllegalArgumentException.class)
@@ -112,6 +114,7 @@ public class ITestAzureBlobFileSystemCheckAccess
   @Test(expected = FileNotFoundException.class)
   public void testCheckAccessForNonExistentFile() throws Exception {
     assumeHNSAndCheckAccessEnabled();
+    setTestUserFs();
     Path nonExistentFile = setupTestDirectoryAndUserAccess(
         "/nonExistentFile1.txt", FsAction.ALL);
     superUserFs.delete(nonExistentFile, true);
@@ -155,12 +158,16 @@ public class ITestAzureBlobFileSystemCheckAccess
     Assume.assumeFalse(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is true",
         getConfiguration()
             .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, true));
+    Assume.assumeTrue(FS_AZURE_ENABLE_CHECK_ACCESS + " is false",
+            isCheckAccessEnabled);
+    setTestUserFs();
     testUserFs.access(new Path("/"), FsAction.READ);
   }
 
   @Test
   public void testFsActionNONE() throws Exception {
     assumeHNSAndCheckAccessEnabled();
+    setTestUserFs();
     Path testFilePath = setupTestDirectoryAndUserAccess("/test2.txt",
         FsAction.NONE);
     assertInaccessible(testFilePath, FsAction.EXECUTE);
@@ -175,6 +182,7 @@ public class ITestAzureBlobFileSystemCheckAccess
   @Test
   public void testFsActionEXECUTE() throws Exception {
     assumeHNSAndCheckAccessEnabled();
+    setTestUserFs();
     Path testFilePath = setupTestDirectoryAndUserAccess("/test3.txt",
         FsAction.EXECUTE);
     assertAccessible(testFilePath, FsAction.EXECUTE);
@@ -190,6 +198,7 @@ public class ITestAzureBlobFileSystemCheckAccess
   @Test
   public void testFsActionREAD() throws Exception {
     assumeHNSAndCheckAccessEnabled();
+    setTestUserFs();
     Path testFilePath = setupTestDirectoryAndUserAccess("/test4.txt",
         FsAction.READ);
     assertAccessible(testFilePath, FsAction.READ);
@@ -205,6 +214,7 @@ public class ITestAzureBlobFileSystemCheckAccess
   @Test
   public void testFsActionWRITE() throws Exception {
     assumeHNSAndCheckAccessEnabled();
+    setTestUserFs();
     Path testFilePath = setupTestDirectoryAndUserAccess("/test5.txt",
         FsAction.WRITE);
     assertAccessible(testFilePath, FsAction.WRITE);
@@ -220,6 +230,7 @@ public class ITestAzureBlobFileSystemCheckAccess
   @Test
   public void testFsActionREADEXECUTE() throws Exception {
     assumeHNSAndCheckAccessEnabled();
+    setTestUserFs();
     Path testFilePath = setupTestDirectoryAndUserAccess("/test6.txt",
         FsAction.READ_EXECUTE);
     assertAccessible(testFilePath, FsAction.EXECUTE);
@@ -235,6 +246,7 @@ public class ITestAzureBlobFileSystemCheckAccess
   @Test
   public void testFsActionWRITEEXECUTE() throws Exception {
     assumeHNSAndCheckAccessEnabled();
+    setTestUserFs();
     Path testFilePath = setupTestDirectoryAndUserAccess("/test7.txt",
         FsAction.WRITE_EXECUTE);
     assertAccessible(testFilePath, FsAction.EXECUTE);
@@ -250,6 +262,7 @@ public class ITestAzureBlobFileSystemCheckAccess
   @Test
   public void testFsActionALL() throws Exception {
     assumeHNSAndCheckAccessEnabled();
+    setTestUserFs();
     Path testFilePath = setupTestDirectoryAndUserAccess("/test8.txt",
         FsAction.ALL);
     assertAccessible(testFilePath, FsAction.EXECUTE);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org