You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2020/07/21 16:22:49 UTC
[hadoop] branch trunk updated: Hadoop 17132. ABFS: Fix Rename and
Delete Idempotency check trigger
This is an automated email from the ASF dual-hosted git repository.
dazhou pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new d23cc9d Hadoop 17132. ABFS: Fix Rename and Delete Idempotency check trigger
d23cc9d is described below
commit d23cc9d85d887f01d72180bdf1af87dfdee15c5a
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Tue Jul 21 21:52:38 2020 +0530
Hadoop 17132. ABFS: Fix Rename and Delete Idempotency check trigger
- Contributed by Sneha Vijayarajan
---
.../hadoop/fs/azurebfs/services/AbfsClient.java | 55 +++++++++++-----
.../fs/azurebfs/services/AbfsRestOperation.java | 7 +-
.../azurebfs/ITestAzureBlobFileSystemDelete.java | 65 +++++++++++++++++-
.../azurebfs/ITestAzureBlobFileSystemRename.java | 68 +++++++++++++++++++
.../fs/azurebfs/services/TestAbfsClient.java | 76 ++++++++++++++++++++++
5 files changed, 253 insertions(+), 18 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index f747bd0..e1ea75e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -336,10 +336,19 @@ public class AbfsClient implements Closeable {
url,
requestHeaders);
Instant renameRequestStartTime = Instant.now();
- op.execute();
-
- if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
- return renameIdempotencyCheckOp(renameRequestStartTime, op, destination);
+ try {
+ op.execute();
+ } catch (AzureBlobFileSystemException e) {
+ final AbfsRestOperation idempotencyOp = renameIdempotencyCheckOp(
+ renameRequestStartTime, op, destination);
+ if (idempotencyOp.getResult().getStatusCode()
+ == op.getResult().getStatusCode()) {
+ // idempotency did not return different result
+ // throw back the exception
+ throw e;
+ } else {
+ return idempotencyOp;
+ }
}
return op;
@@ -369,14 +378,21 @@ public class AbfsClient implements Closeable {
// exists. Check on destination status and if it has a recent LMT timestamp.
// If yes, return success, else fall back to original rename request failure response.
- final AbfsRestOperation destStatusOp = getPathStatus(destination, false);
- if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
- String lmt = destStatusOp.getResult().getResponseHeader(
- HttpHeaderConfigurations.LAST_MODIFIED);
-
- if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
- return destStatusOp;
+ try {
+ final AbfsRestOperation destStatusOp = getPathStatus(destination,
+ false);
+ if (destStatusOp.getResult().getStatusCode()
+ == HttpURLConnection.HTTP_OK) {
+ String lmt = destStatusOp.getResult().getResponseHeader(
+ HttpHeaderConfigurations.LAST_MODIFIED);
+
+ if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
+ return destStatusOp;
+ }
}
+ } catch (AzureBlobFileSystemException e) {
+ // GetFileStatus on the destination failed, return original op
+ return op;
}
}
@@ -570,10 +586,18 @@ public class AbfsClient implements Closeable {
HTTP_METHOD_DELETE,
url,
requestHeaders);
+ try {
op.execute();
-
- if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
- return deleteIdempotencyCheckOp(op);
+ } catch (AzureBlobFileSystemException e) {
+ final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op);
+ if (idempotencyOp.getResult().getStatusCode()
+ == op.getResult().getStatusCode()) {
+ // idempotency did not return different result
+ // throw back the exception
+ throw e;
+ } else {
+ return idempotencyOp;
+ }
}
return op;
@@ -822,7 +846,8 @@ public class AbfsClient implements Closeable {
return createRequestUrl(EMPTY_STRING, query);
}
- private URL createRequestUrl(final String path, final String query)
+ @VisibleForTesting
+ protected URL createRequestUrl(final String path, final String query)
throws AzureBlobFileSystemException {
final String base = baseUrl.toString();
String encodedPath = path;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index f3986d4..936267a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -24,6 +24,7 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,7 +171,8 @@ public class AbfsRestOperation {
* Executes the REST operation with retry, by issuing one or more
* HTTP operations.
*/
- void execute() throws AzureBlobFileSystemException {
+ @VisibleForTesting
+ public void execute() throws AzureBlobFileSystemException {
// see if we have latency reports from the previous requests
String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency();
if (latencyHeader != null && !latencyHeader.isEmpty()) {
@@ -181,8 +183,9 @@ public class AbfsRestOperation {
retryCount = 0;
LOG.debug("First execution of REST operation - {}", operationType);
- while (!executeHttpOperation(retryCount++)) {
+ while (!executeHttpOperation(retryCount)) {
try {
+ ++retryCount;
LOG.debug("Retrying REST operation {}. RetryCount = {}",
operationType, retryCount);
Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
index e297396..2f2a619 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
@@ -30,6 +30,7 @@ import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
@@ -38,9 +39,12 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -168,7 +172,8 @@ public class ITestAzureBlobFileSystemDelete extends
// Set retryCount to non-zero
when(op.isARetriedRequest()).thenReturn(true);
- // Mock instance of Http Operation response. This will return HTTP:Not Found
+ // Case 1: Mock instance of Http Operation response. This will return
+ // HTTP:Not Found
AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
@@ -181,6 +186,64 @@ public class ITestAzureBlobFileSystemDelete extends
.describedAs(
"Delete is considered idempotent by default and should return success.")
.isEqualTo(HTTP_OK);
+
+ // Case 2: Mock instance of Http Operation response. This will return
+ // HTTP:Bad Request
+ AbfsHttpOperation http400Op = mock(AbfsHttpOperation.class);
+ when(http400Op.getStatusCode()).thenReturn(HTTP_BAD_REQUEST);
+
+ // Mock delete response to 400
+ when(op.getResult()).thenReturn(http400Op);
+
+ Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op)
+ .getResult()
+ .getStatusCode())
+ .describedAs(
+ "Idempotency check to happen only for HTTP 404 response.")
+ .isEqualTo(HTTP_BAD_REQUEST);
+
+ }
+
+ @Test
+ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
+
+ final AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = TestAbfsClient.createTestClientFromCurrentContext(
+ fs.getAbfsStore().getClient(),
+ this.getConfiguration());
+
+ // Case 1: Not a retried case should throw error back
+ intercept(AbfsRestOperationException.class,
+ () -> client.deletePath(
+ "/NonExistingPath",
+ false,
+ null));
+
+ // mock idempotency check to mimic retried case
+ AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
+ fs.getAbfsStore().getClient(),
+ this.getConfiguration());
+
+ // Case 2: Mimic retried case
+ // Idempotency check on Delete always returns success
+ AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
+ when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
+ when(idempotencyRetOp.getResult()).thenReturn(http200Op);
+
+ doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
+ when(mockClient.deletePath("/NonExistingPath", false,
+ null)).thenCallRealMethod();
+
+ Assertions.assertThat(mockClient.deletePath(
+ "/NonExistingPath",
+ false,
+ null)
+ .getResult()
+ .getStatusCode())
+ .describedAs("Idempotency check reports successful "
+ + "delete. 200OK should be returned")
+ .isEqualTo(idempotencyRetOp.getResult().getStatusCode());
}
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
index 7e03ee5..2adf70c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
@@ -30,6 +30,7 @@ import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.Assert;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
@@ -42,6 +43,8 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.UUID.randomUUID;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -51,6 +54,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotE
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
/**
* Test rename operation.
*/
@@ -78,6 +83,16 @@ public class ITestAzureBlobFileSystemRename extends
}
@Test
+ public void testRenameWithPreExistingDestination() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ Path src = path("renameSrc");
+ touch(src);
+ Path dest = path("renameDest");
+ touch(dest);
+ assertRenameOutcome(fs, src, dest, false);
+ }
+
+ @Test
public void testRenameFileUnderDir() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path sourceDir = new Path("/testSrc");
@@ -197,6 +212,59 @@ public class ITestAzureBlobFileSystemRename extends
+ "TimespanForIdentifyingRecentOperationThroughLMT.");
}
+ @Test
+ public void testRenameIdempotencyTriggerHttpNotFound() throws Exception {
+ AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
+ when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
+
+ AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
+ when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
+
+ // Check 1 where idempotency check fails to find dest path
+ // Rename should throw exception
+ testRenameIdempotencyTriggerChecks(http404Op);
+
+ // Check 2 where idempotency check finds the dest path
+ // Renam will be successful
+ testRenameIdempotencyTriggerChecks(http200Op);
+ }
+
+ private void testRenameIdempotencyTriggerChecks(
+ AbfsHttpOperation idempotencyRetHttpOp) throws Exception {
+
+ final AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = TestAbfsClient.getMockAbfsClient(
+ fs.getAbfsStore().getClient(),
+ this.getConfiguration());
+
+ AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
+ when(idempotencyRetOp.getResult()).thenReturn(idempotencyRetHttpOp);
+ doReturn(idempotencyRetOp).when(client).renameIdempotencyCheckOp(any(),
+ any(), any());
+ when(client.renamePath(any(), any(), any())).thenCallRealMethod();
+
+ // rename on non-existing source file will trigger idempotency check
+ if (idempotencyRetHttpOp.getStatusCode() == HTTP_OK) {
+ // idempotency check found that destination exists and is recently created
+ Assertions.assertThat(client.renamePath(
+ "/NonExistingsourcepath",
+ "/destpath",
+ null)
+ .getResult()
+ .getStatusCode())
+ .describedAs("Idempotency check reports recent successful "
+ + "rename. 200OK should be returned")
+ .isEqualTo(idempotencyRetOp.getResult().getStatusCode());
+ } else {
+ // rename dest not found. Original exception should be returned.
+ intercept(AbfsRestOperationException.class,
+ () -> client.renamePath(
+ "/NonExistingsourcepath",
+ "/destpath",
+ ""));
+ }
+ }
+
private void testRenameTimeout(
int renameRequestStatus,
int renameIdempotencyCheckStatus,
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 8197e7e..bab02c0 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.regex.Pattern;
@@ -33,6 +34,9 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
@@ -271,4 +275,76 @@ public final class TestAbfsClient {
return testClient;
}
+
+ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
+ AbfsConfiguration abfsConfig)
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+ AuthType currentAuthType = abfsConfig.getAuthType(
+ abfsConfig.getAccountName());
+
+ org.junit.Assume.assumeTrue(
+ (currentAuthType == AuthType.SharedKey)
+ || (currentAuthType == AuthType.OAuth));
+
+ AbfsClient client = mock(AbfsClient.class);
+ AbfsPerfTracker tracker = new AbfsPerfTracker(
+ "test",
+ abfsConfig.getAccountName(),
+ abfsConfig);
+
+ when(client.getAbfsPerfTracker()).thenReturn(tracker);
+ when(client.getAuthType()).thenReturn(currentAuthType);
+ when(client.getRetryPolicy()).thenReturn(
+ new ExponentialRetryPolicy(1));
+
+ when(client.createDefaultUriQueryBuilder()).thenCallRealMethod();
+ when(client.createRequestUrl(any(), any())).thenCallRealMethod();
+ when(client.getAccessToken()).thenCallRealMethod();
+ when(client.getSharedKeyCredentials()).thenCallRealMethod();
+ when(client.createDefaultHeaders()).thenCallRealMethod();
+
+ // override baseurl
+ Field baseUrlField = AbfsClient.class.getDeclaredField("baseUrl");
+ baseUrlField.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(baseUrlField, baseUrlField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
+ baseUrlField.set(client, baseAbfsClientInstance.getBaseUrl());
+
+ // override auth provider
+ if (currentAuthType == AuthType.SharedKey) {
+ Field sharedKeyCredsField = AbfsClient.class.getDeclaredField(
+ "sharedKeyCredentials");
+ sharedKeyCredsField.setAccessible(true);
+ modifiersField.setInt(sharedKeyCredsField,
+ sharedKeyCredsField.getModifiers()
+ & ~java.lang.reflect.Modifier.FINAL);
+ sharedKeyCredsField.set(client, new SharedKeyCredentials(
+ abfsConfig.getAccountName().substring(0,
+ abfsConfig.getAccountName().indexOf(DOT)),
+ abfsConfig.getStorageAccountKey()));
+ } else {
+ Field tokenProviderField = AbfsClient.class.getDeclaredField(
+ "tokenProvider");
+ tokenProviderField.setAccessible(true);
+ modifiersField.setInt(tokenProviderField,
+ tokenProviderField.getModifiers()
+ & ~java.lang.reflect.Modifier.FINAL);
+ tokenProviderField.set(client, abfsConfig.getTokenProvider());
+ }
+
+ // override user agent
+ String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
+ + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
+ + "UNKNOWN/UNKNOWN) MSFT";
+ Field userAgentField = AbfsClient.class.getDeclaredField(
+ "userAgent");
+ userAgentField.setAccessible(true);
+ modifiersField.setInt(userAgentField,
+ userAgentField.getModifiers()
+ & ~java.lang.reflect.Modifier.FINAL);
+ userAgentField.set(client, userAgent);
+
+ return client;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org