You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2020/04/07 03:10:32 UTC
[hadoop] branch branch-2 updated: HADOOP-16933. Backport
HADOOP-16890 and HADOOP-16825 to branch-2
This is an automated email from the ASF dual-hosted git repository.
dazhou pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 018c722 HADOOP-16933. Backport HADOOP-16890 and HADOOP-16825 to branch-2
018c722 is described below
commit 018c72280a60b203ed01d0469ada42026fb97a7f
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Tue Apr 7 08:40:14 2020 +0530
HADOOP-16933. Backport HADOOP-16890 and HADOOP-16825 to branch-2
Hadoop 16890. Change in expiry calculation for MSI token provider. Contributed by Bilahari T H
(cherry picked from commit 0b931f36ec83dc72729a9e84a0d313f471061c64)
HADOOP-16825: ITestAzureBlobFileSystemCheckAccess failing. Contributed by Bilahari T H
(cherry picked from commit 5944d28130925fe1452f545e96b5e44f064bc69e)
---
hadoop-project/pom.xml | 5 ++
hadoop-tools/hadoop-azure/pom.xml | 6 ++
.../fs/azurebfs/oauth2/AccessTokenProvider.java | 2 +-
.../fs/azurebfs/oauth2/AzureADAuthenticator.java | 57 +++++++++----
.../fs/azurebfs/oauth2/MsiTokenProvider.java | 34 ++++++++
.../fs/azurebfs/ITestAbfsMsiTokenProvider.java | 93 ++++++++++++++++++++++
.../ITestAzureBlobFileSystemCheckAccess.java | 21 ++++-
7 files changed, 199 insertions(+), 19 deletions(-)
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 7aaf826..1f6a7dd 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -1314,6 +1314,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>1.3</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index 245c883..f4fc358 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -249,6 +249,12 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<profiles>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
index 72f37a1..a20e6df 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java
@@ -72,7 +72,7 @@ public abstract class AccessTokenProvider {
*
* @return true if the token is expiring in next 5 minutes
*/
- private boolean isTokenAboutToExpire() {
+ protected boolean isTokenAboutToExpire() {
if (token == null) {
LOG.debug("AADToken: no token. Returning expiring=true");
return true; // no token should have same response as expired token
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
index 0eb379c..03be1c0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
@@ -136,7 +136,7 @@ public final class AzureADAuthenticator {
headers.put("Metadata", "true");
LOG.debug("AADToken: starting to fetch token using MSI");
- return getTokenCall(authEndpoint, qp.serialize(), headers, "GET");
+ return getTokenCall(authEndpoint, qp.serialize(), headers, "GET", true);
}
/**
@@ -197,8 +197,13 @@ public final class AzureADAuthenticator {
}
private static AzureADToken getTokenCall(String authEndpoint, String body,
- Hashtable<String, String> headers, String httpMethod)
- throws IOException {
+ Hashtable<String, String> headers, String httpMethod) throws IOException {
+ return getTokenCall(authEndpoint, body, headers, httpMethod, false);
+ }
+
+ private static AzureADToken getTokenCall(String authEndpoint, String body,
+ Hashtable<String, String> headers, String httpMethod, boolean isMsi)
+ throws IOException {
AzureADToken token = null;
ExponentialRetryPolicy retryPolicy
= new ExponentialRetryPolicy(3, 0, 1000, 2);
@@ -211,7 +216,7 @@ public final class AzureADAuthenticator {
httperror = 0;
ex = null;
try {
- token = getTokenSingleCall(authEndpoint, body, headers, httpMethod);
+ token = getTokenSingleCall(authEndpoint, body, headers, httpMethod, isMsi);
} catch (HttpException e) {
httperror = e.httpErrorCode;
ex = e;
@@ -227,8 +232,9 @@ public final class AzureADAuthenticator {
return token;
}
- private static AzureADToken getTokenSingleCall(
- String authEndpoint, String payload, Hashtable<String, String> headers, String httpMethod)
+ private static AzureADToken getTokenSingleCall(String authEndpoint,
+ String payload, Hashtable<String, String> headers, String httpMethod,
+ boolean isMsi)
throws IOException {
AzureADToken token = null;
@@ -268,7 +274,7 @@ public final class AzureADAuthenticator {
if (httpResponseCode == HttpURLConnection.HTTP_OK
&& responseContentType.startsWith("application/json") && responseContentLength > 0) {
InputStream httpResponseStream = conn.getInputStream();
- token = parseTokenFromStream(httpResponseStream);
+ token = parseTokenFromStream(httpResponseStream, isMsi);
} else {
String responseBody = consumeInputStream(conn.getErrorStream(), 1024);
String proxies = "none";
@@ -296,10 +302,12 @@ public final class AzureADAuthenticator {
return token;
}
- private static AzureADToken parseTokenFromStream(InputStream httpResponseStream) throws IOException {
+ private static AzureADToken parseTokenFromStream(
+ InputStream httpResponseStream, boolean isMsi) throws IOException {
AzureADToken token = new AzureADToken();
try {
- int expiryPeriod = 0;
+ int expiryPeriodInSecs = 0;
+ long expiresOnInSecs = -1;
JsonFactory jf = new JsonFactory();
JsonParser jp = jf.createJsonParser(httpResponseStream);
@@ -314,17 +322,38 @@ public final class AzureADAuthenticator {
if (fieldName.equals("access_token")) {
token.setAccessToken(fieldValue);
}
+
if (fieldName.equals("expires_in")) {
- expiryPeriod = Integer.parseInt(fieldValue);
+ expiryPeriodInSecs = Integer.parseInt(fieldValue);
+ }
+
+ if (fieldName.equals("expires_on")) {
+ expiresOnInSecs = Long.parseLong(fieldValue);
}
+
}
jp.nextToken();
}
jp.close();
- long expiry = System.currentTimeMillis();
- expiry = expiry + expiryPeriod * 1000L; // convert expiryPeriod to milliseconds and add
- token.setExpiry(new Date(expiry));
- LOG.debug("AADToken: fetched token with expiry " + token.getExpiry().toString());
+ if (expiresOnInSecs > 0) {
+ LOG.debug("Expiry based on expires_on: {}", expiresOnInSecs);
+ token.setExpiry(new Date(expiresOnInSecs * 1000));
+ } else {
+ if (isMsi) {
+ // Currently there is a known issue that MSI does not update expires_in
+ // for refresh and will have the value from first AAD token fetch request.
+ // Due to this known limitation, expires_in is not supported for MSI token fetch flow.
+ throw new UnsupportedOperationException("MSI Responded with invalid expires_on");
+ }
+
+ LOG.debug("Expiry based on expires_in: {}", expiryPeriodInSecs);
+ long expiry = System.currentTimeMillis();
+ expiry = expiry + expiryPeriodInSecs * 1000L; // convert expiryPeriod to milliseconds and add
+ token.setExpiry(new Date(expiry));
+ }
+
+ LOG.debug("AADToken: fetched token with expiry {}, expiresOn passed: {}",
+ token.getExpiry().toString(), expiresOnInSecs);
} catch (Exception ex) {
LOG.debug("AADToken: got exception when parsing json token " + ex.toString());
throw ex;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
index 38f3045..784365b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
@@ -36,6 +36,10 @@ public class MsiTokenProvider extends AccessTokenProvider {
private final String clientId;
+ private long tokenFetchTime = -1;
+
+ private static final long ONE_HOUR = 3600 * 1000;
+
private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
public MsiTokenProvider(final String authEndpoint, final String tenantGuid,
@@ -51,6 +55,36 @@ public class MsiTokenProvider extends AccessTokenProvider {
LOG.debug("AADToken: refreshing token from MSI");
AzureADToken token = AzureADAuthenticator
.getTokenFromMsi(authEndpoint, tenantGuid, clientId, authority, false);
+ tokenFetchTime = System.currentTimeMillis();
return token;
}
+
+ /**
+ * Checks if the token is about to expire as per base expiry logic.
+ * Otherwise try to expire every 1 hour
+ *
+ * @return true if the token is expiring in next 1 hour or if a token has
+ * never been fetched
+ */
+ @Override
+ protected boolean isTokenAboutToExpire() {
+ if (tokenFetchTime == -1 || super.isTokenAboutToExpire()) {
+ return true;
+ }
+
+ boolean expiring = false;
+ long elapsedTimeSinceLastTokenRefreshInMillis =
+ System.currentTimeMillis() - tokenFetchTime;
+ expiring = elapsedTimeSinceLastTokenRefreshInMillis >= ONE_HOUR
+ || elapsedTimeSinceLastTokenRefreshInMillis < 0;
+ // In case of, Token is not refreshed for 1 hr or any clock skew issues,
+ // refresh token.
+ if (expiring) {
+ LOG.debug("MSIToken: token renewing. Time elapsed since last token fetch:"
+ + " {} milli seconds", elapsedTimeSinceLastTokenRefreshInMillis);
+ }
+
+ return expiring;
+ }
+
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java
new file mode 100644
index 0000000..d871bef
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.junit.Test;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
+import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider;
+
+import static org.junit.Assume.assumeThat;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.isEmptyOrNullString;
+import static org.hamcrest.Matchers.isEmptyString;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY;
+import static org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT;
+
+/**
+ * Test MsiTokenProvider.
+ */
+public final class ITestAbfsMsiTokenProvider
+ extends AbstractAbfsIntegrationTest {
+
+ public ITestAbfsMsiTokenProvider() throws Exception {
+ super();
+ }
+
+ @Test
+ public void test() throws IOException {
+ AbfsConfiguration conf = getConfiguration();
+ assumeThat(conf.get(FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT),
+ not(isEmptyOrNullString()));
+ assumeThat(conf.get(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT),
+ not(isEmptyOrNullString()));
+ assumeThat(conf.get(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID),
+ not(isEmptyOrNullString()));
+ assumeThat(conf.get(FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY),
+ not(isEmptyOrNullString()));
+
+ String tenantGuid = conf
+ .getPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT);
+ String clientId = conf.getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
+ String authEndpoint = getTrimmedPasswordString(conf,
+ FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT,
+ DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT);
+ String authority = getTrimmedPasswordString(conf,
+ FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY,
+ DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY);
+ AccessTokenProvider tokenProvider = new MsiTokenProvider(authEndpoint,
+ tenantGuid, clientId, authority);
+
+ AzureADToken token = null;
+ token = tokenProvider.getToken();
+ assertThat(token.getAccessToken(), not(isEmptyString()));
+ assertThat(token.getExpiry().after(new Date()), is(true));
+ }
+
+ private String getTrimmedPasswordString(AbfsConfiguration conf, String key,
+ String defaultValue) throws IOException {
+ String value = conf.getPasswordString(key);
+ if (StringUtils.isBlank(value)) {
+ value = defaultValue;
+ }
+ return value.trim();
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java
index d1f9ec7..393eff4 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java
@@ -54,7 +54,7 @@ public class ITestAzureBlobFileSystemCheckAccess
private static final String TEST_FOLDER_PATH = "CheckAccessTestFolder";
private final FileSystem superUserFs;
- private final FileSystem testUserFs;
+ private FileSystem testUserFs;
private final String testUserGuid;
private final boolean isCheckAccessEnabled;
private final boolean isHNSEnabled;
@@ -64,13 +64,15 @@ public class ITestAzureBlobFileSystemCheckAccess
this.superUserFs = getFileSystem();
testUserGuid = getConfiguration()
.get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID);
- this.testUserFs = getTestUserFs();
this.isCheckAccessEnabled = getConfiguration().isCheckAccessEnabled();
this.isHNSEnabled = getConfiguration()
.getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
}
- private FileSystem getTestUserFs() throws Exception {
+ private void setTestUserFs() throws Exception {
+ if (this.testUserFs != null) {
+ return;
+ }
String orgClientId = getConfiguration().get(FS_AZURE_BLOB_FS_CLIENT_ID);
String orgClientSecret = getConfiguration()
.get(FS_AZURE_BLOB_FS_CLIENT_SECRET);
@@ -89,7 +91,7 @@ public class ITestAzureBlobFileSystemCheckAccess
getRawConfiguration()
.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
orgCreateFileSystemDurungInit);
- return fs;
+ this.testUserFs = fs;
}
@Test(expected = IllegalArgumentException.class)
@@ -112,6 +114,7 @@ public class ITestAzureBlobFileSystemCheckAccess
@Test(expected = FileNotFoundException.class)
public void testCheckAccessForNonExistentFile() throws Exception {
assumeHNSAndCheckAccessEnabled();
+ setTestUserFs();
Path nonExistentFile = setupTestDirectoryAndUserAccess(
"/nonExistentFile1.txt", FsAction.ALL);
superUserFs.delete(nonExistentFile, true);
@@ -155,12 +158,16 @@ public class ITestAzureBlobFileSystemCheckAccess
Assume.assumeFalse(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is true",
getConfiguration()
.getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, true));
+ Assume.assumeTrue(FS_AZURE_ENABLE_CHECK_ACCESS + " is false",
+ isCheckAccessEnabled);
+ setTestUserFs();
testUserFs.access(new Path("/"), FsAction.READ);
}
@Test
public void testFsActionNONE() throws Exception {
assumeHNSAndCheckAccessEnabled();
+ setTestUserFs();
Path testFilePath = setupTestDirectoryAndUserAccess("/test2.txt",
FsAction.NONE);
assertInaccessible(testFilePath, FsAction.EXECUTE);
@@ -175,6 +182,7 @@ public class ITestAzureBlobFileSystemCheckAccess
@Test
public void testFsActionEXECUTE() throws Exception {
assumeHNSAndCheckAccessEnabled();
+ setTestUserFs();
Path testFilePath = setupTestDirectoryAndUserAccess("/test3.txt",
FsAction.EXECUTE);
assertAccessible(testFilePath, FsAction.EXECUTE);
@@ -190,6 +198,7 @@ public class ITestAzureBlobFileSystemCheckAccess
@Test
public void testFsActionREAD() throws Exception {
assumeHNSAndCheckAccessEnabled();
+ setTestUserFs();
Path testFilePath = setupTestDirectoryAndUserAccess("/test4.txt",
FsAction.READ);
assertAccessible(testFilePath, FsAction.READ);
@@ -205,6 +214,7 @@ public class ITestAzureBlobFileSystemCheckAccess
@Test
public void testFsActionWRITE() throws Exception {
assumeHNSAndCheckAccessEnabled();
+ setTestUserFs();
Path testFilePath = setupTestDirectoryAndUserAccess("/test5.txt",
FsAction.WRITE);
assertAccessible(testFilePath, FsAction.WRITE);
@@ -220,6 +230,7 @@ public class ITestAzureBlobFileSystemCheckAccess
@Test
public void testFsActionREADEXECUTE() throws Exception {
assumeHNSAndCheckAccessEnabled();
+ setTestUserFs();
Path testFilePath = setupTestDirectoryAndUserAccess("/test6.txt",
FsAction.READ_EXECUTE);
assertAccessible(testFilePath, FsAction.EXECUTE);
@@ -235,6 +246,7 @@ public class ITestAzureBlobFileSystemCheckAccess
@Test
public void testFsActionWRITEEXECUTE() throws Exception {
assumeHNSAndCheckAccessEnabled();
+ setTestUserFs();
Path testFilePath = setupTestDirectoryAndUserAccess("/test7.txt",
FsAction.WRITE_EXECUTE);
assertAccessible(testFilePath, FsAction.EXECUTE);
@@ -250,6 +262,7 @@ public class ITestAzureBlobFileSystemCheckAccess
@Test
public void testFsActionALL() throws Exception {
assumeHNSAndCheckAccessEnabled();
+ setTestUserFs();
Path testFilePath = setupTestDirectoryAndUserAccess("/test8.txt",
FsAction.ALL);
assertAccessible(testFilePath, FsAction.EXECUTE);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org