You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2020/07/21 16:22:49 UTC

[hadoop] branch trunk updated: Hadoop 17132. ABFS: Fix Rename and Delete Idempotency check trigger

This is an automated email from the ASF dual-hosted git repository.

dazhou pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d23cc9d  Hadoop 17132. ABFS: Fix Rename and Delete Idempotency check trigger
d23cc9d is described below

commit d23cc9d85d887f01d72180bdf1af87dfdee15c5a
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Tue Jul 21 21:52:38 2020 +0530

    Hadoop 17132. ABFS: Fix Rename and Delete Idempotency check trigger
    
    - Contributed by Sneha Vijayarajan
---
 .../hadoop/fs/azurebfs/services/AbfsClient.java    | 55 +++++++++++-----
 .../fs/azurebfs/services/AbfsRestOperation.java    |  7 +-
 .../azurebfs/ITestAzureBlobFileSystemDelete.java   | 65 +++++++++++++++++-
 .../azurebfs/ITestAzureBlobFileSystemRename.java   | 68 +++++++++++++++++++
 .../fs/azurebfs/services/TestAbfsClient.java       | 76 ++++++++++++++++++++++
 5 files changed, 253 insertions(+), 18 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index f747bd0..e1ea75e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -336,10 +336,19 @@ public class AbfsClient implements Closeable {
             url,
             requestHeaders);
     Instant renameRequestStartTime = Instant.now();
-    op.execute();
-
-    if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
-      return renameIdempotencyCheckOp(renameRequestStartTime, op, destination);
+    try {
+      op.execute();
+    } catch (AzureBlobFileSystemException e) {
+        final AbfsRestOperation idempotencyOp = renameIdempotencyCheckOp(
+            renameRequestStartTime, op, destination);
+        if (idempotencyOp.getResult().getStatusCode()
+            == op.getResult().getStatusCode()) {
+          // idempotency did not return different result
+          // throw back the exception
+          throw e;
+        } else {
+          return idempotencyOp;
+        }
     }
 
     return op;
@@ -369,14 +378,21 @@ public class AbfsClient implements Closeable {
       // exists. Check on destination status and if it has a recent LMT timestamp.
       // If yes, return success, else fall back to original rename request failure response.
 
-      final AbfsRestOperation destStatusOp = getPathStatus(destination, false);
-      if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
-        String lmt = destStatusOp.getResult().getResponseHeader(
-            HttpHeaderConfigurations.LAST_MODIFIED);
-
-        if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
-          return destStatusOp;
+      try {
+        final AbfsRestOperation destStatusOp = getPathStatus(destination,
+            false);
+        if (destStatusOp.getResult().getStatusCode()
+            == HttpURLConnection.HTTP_OK) {
+          String lmt = destStatusOp.getResult().getResponseHeader(
+              HttpHeaderConfigurations.LAST_MODIFIED);
+
+          if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
+            return destStatusOp;
+          }
         }
+      } catch (AzureBlobFileSystemException e) {
+        // GetFileStatus on the destination failed, return original op
+        return op;
       }
     }
 
@@ -570,10 +586,18 @@ public class AbfsClient implements Closeable {
             HTTP_METHOD_DELETE,
             url,
             requestHeaders);
+    try {
     op.execute();
-
-    if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
-      return deleteIdempotencyCheckOp(op);
+    } catch (AzureBlobFileSystemException e) {
+      final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op);
+      if (idempotencyOp.getResult().getStatusCode()
+          == op.getResult().getStatusCode()) {
+        // idempotency did not return different result
+        // throw back the exception
+        throw e;
+      } else {
+        return idempotencyOp;
+      }
     }
 
     return op;
@@ -822,7 +846,8 @@ public class AbfsClient implements Closeable {
     return createRequestUrl(EMPTY_STRING, query);
   }
 
-  private URL createRequestUrl(final String path, final String query)
+  @VisibleForTesting
+  protected URL createRequestUrl(final String path, final String query)
           throws AzureBlobFileSystemException {
     final String base = baseUrl.toString();
     String encodedPath = path;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index f3986d4..936267a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -24,6 +24,7 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -170,7 +171,8 @@ public class AbfsRestOperation {
    * Executes the REST operation with retry, by issuing one or more
    * HTTP operations.
    */
-  void execute() throws AzureBlobFileSystemException {
+   @VisibleForTesting
+   public void execute() throws AzureBlobFileSystemException {
     // see if we have latency reports from the previous requests
     String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency();
     if (latencyHeader != null && !latencyHeader.isEmpty()) {
@@ -181,8 +183,9 @@ public class AbfsRestOperation {
 
     retryCount = 0;
     LOG.debug("First execution of REST operation - {}", operationType);
-    while (!executeHttpOperation(retryCount++)) {
+    while (!executeHttpOperation(retryCount)) {
       try {
+        ++retryCount;
         LOG.debug("Retrying REST operation {}. RetryCount = {}",
             operationType, retryCount);
         Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
index e297396..2f2a619 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
@@ -30,6 +30,7 @@ import org.assertj.core.api.Assertions;
 import org.junit.Assume;
 import org.junit.Test;
 
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
@@ -38,9 +39,12 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_OK;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -168,7 +172,8 @@ public class ITestAzureBlobFileSystemDelete extends
     // Set retryCount to non-zero
     when(op.isARetriedRequest()).thenReturn(true);
 
-    // Mock instance of Http Operation response. This will return HTTP:Not Found
+    // Case 1: Mock instance of Http Operation response. This will return
+    // HTTP:Not Found
     AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
     when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
 
@@ -181,6 +186,64 @@ public class ITestAzureBlobFileSystemDelete extends
         .describedAs(
             "Delete is considered idempotent by default and should return success.")
         .isEqualTo(HTTP_OK);
+
+    // Case 2: Mock instance of Http Operation response. This will return
+    // HTTP:Bad Request
+    AbfsHttpOperation http400Op = mock(AbfsHttpOperation.class);
+    when(http400Op.getStatusCode()).thenReturn(HTTP_BAD_REQUEST);
+
+    // Mock delete response to 400
+    when(op.getResult()).thenReturn(http400Op);
+
+    Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op)
+        .getResult()
+        .getStatusCode())
+        .describedAs(
+            "Idempotency check to happen only for HTTP 404 response.")
+        .isEqualTo(HTTP_BAD_REQUEST);
+
+  }
+
+  @Test
+  public void testDeleteIdempotencyTriggerHttp404() throws Exception {
+
+    final AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = TestAbfsClient.createTestClientFromCurrentContext(
+        fs.getAbfsStore().getClient(),
+        this.getConfiguration());
+
+    // Case 1: Not a retried case should throw error back
+    intercept(AbfsRestOperationException.class,
+        () -> client.deletePath(
+        "/NonExistingPath",
+        false,
+        null));
+
+    // mock idempotency check to mimic retried case
+    AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
+        fs.getAbfsStore().getClient(),
+        this.getConfiguration());
+
+    // Case 2: Mimic retried case
+    // Idempotency check on Delete always returns success
+    AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
+    AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
+    when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
+    when(idempotencyRetOp.getResult()).thenReturn(http200Op);
+
+    doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
+    when(mockClient.deletePath("/NonExistingPath", false,
+        null)).thenCallRealMethod();
+
+    Assertions.assertThat(mockClient.deletePath(
+        "/NonExistingPath",
+        false,
+        null)
+        .getResult()
+        .getStatusCode())
+        .describedAs("Idempotency check reports successful "
+            + "delete. 200OK should be returned")
+        .isEqualTo(idempotencyRetOp.getResult().getStatusCode());
   }
 
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
index 7e03ee5..2adf70c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
@@ -30,6 +30,7 @@ import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.junit.Assert;
 
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
@@ -42,6 +43,8 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_OK;
 import static java.util.UUID.randomUUID;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -51,6 +54,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotE
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
 
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
 /**
  * Test rename operation.
  */
@@ -78,6 +83,16 @@ public class ITestAzureBlobFileSystemRename extends
   }
 
   @Test
+  public void testRenameWithPreExistingDestination() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path src = path("renameSrc");
+    touch(src);
+    Path dest = path("renameDest");
+    touch(dest);
+    assertRenameOutcome(fs, src, dest, false);
+  }
+
+  @Test
   public void testRenameFileUnderDir() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
     Path sourceDir = new Path("/testSrc");
@@ -197,6 +212,59 @@ public class ITestAzureBlobFileSystemRename extends
             + "TimespanForIdentifyingRecentOperationThroughLMT.");
   }
 
+  @Test
+  public void testRenameIdempotencyTriggerHttpNotFound() throws Exception {
+    AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
+    when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
+
+    AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
+    when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
+
+    // Check 1 where idempotency check fails to find dest path
+    // Rename should throw exception
+    testRenameIdempotencyTriggerChecks(http404Op);
+
+    // Check 2 where idempotency check finds the dest path
+    // Renam will be successful
+    testRenameIdempotencyTriggerChecks(http200Op);
+  }
+
+  private void testRenameIdempotencyTriggerChecks(
+      AbfsHttpOperation idempotencyRetHttpOp) throws Exception {
+
+    final AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = TestAbfsClient.getMockAbfsClient(
+        fs.getAbfsStore().getClient(),
+        this.getConfiguration());
+
+    AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
+    when(idempotencyRetOp.getResult()).thenReturn(idempotencyRetHttpOp);
+    doReturn(idempotencyRetOp).when(client).renameIdempotencyCheckOp(any(),
+        any(), any());
+    when(client.renamePath(any(), any(), any())).thenCallRealMethod();
+
+    // rename on non-existing source file will trigger idempotency check
+    if (idempotencyRetHttpOp.getStatusCode() == HTTP_OK) {
+      // idempotency check found that destination exists and is recently created
+      Assertions.assertThat(client.renamePath(
+          "/NonExistingsourcepath",
+          "/destpath",
+          null)
+          .getResult()
+          .getStatusCode())
+          .describedAs("Idempotency check reports recent successful "
+              + "rename. 200OK should be returned")
+          .isEqualTo(idempotencyRetOp.getResult().getStatusCode());
+    } else {
+      // rename dest not found. Original exception should be returned.
+      intercept(AbfsRestOperationException.class,
+          () -> client.renamePath(
+              "/NonExistingsourcepath",
+              "/destpath",
+              ""));
+    }
+  }
+
   private void testRenameTimeout(
       int renameRequestStatus,
       int renameIdempotencyCheckStatus,
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 8197e7e..bab02c0 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.regex.Pattern;
@@ -33,6 +34,9 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
@@ -271,4 +275,76 @@ public final class TestAbfsClient {
 
     return testClient;
   }
+
+  public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
+      AbfsConfiguration abfsConfig)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    AuthType currentAuthType = abfsConfig.getAuthType(
+        abfsConfig.getAccountName());
+
+    org.junit.Assume.assumeTrue(
+        (currentAuthType == AuthType.SharedKey)
+        || (currentAuthType == AuthType.OAuth));
+
+    AbfsClient client = mock(AbfsClient.class);
+    AbfsPerfTracker tracker = new AbfsPerfTracker(
+        "test",
+        abfsConfig.getAccountName(),
+        abfsConfig);
+
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+    when(client.getAuthType()).thenReturn(currentAuthType);
+    when(client.getRetryPolicy()).thenReturn(
+        new ExponentialRetryPolicy(1));
+
+    when(client.createDefaultUriQueryBuilder()).thenCallRealMethod();
+    when(client.createRequestUrl(any(), any())).thenCallRealMethod();
+    when(client.getAccessToken()).thenCallRealMethod();
+    when(client.getSharedKeyCredentials()).thenCallRealMethod();
+    when(client.createDefaultHeaders()).thenCallRealMethod();
+
+    // override baseurl
+    Field baseUrlField = AbfsClient.class.getDeclaredField("baseUrl");
+    baseUrlField.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(baseUrlField, baseUrlField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
+    baseUrlField.set(client, baseAbfsClientInstance.getBaseUrl());
+
+    // override auth provider
+    if (currentAuthType == AuthType.SharedKey) {
+      Field sharedKeyCredsField = AbfsClient.class.getDeclaredField(
+          "sharedKeyCredentials");
+      sharedKeyCredsField.setAccessible(true);
+      modifiersField.setInt(sharedKeyCredsField,
+          sharedKeyCredsField.getModifiers()
+              & ~java.lang.reflect.Modifier.FINAL);
+      sharedKeyCredsField.set(client, new SharedKeyCredentials(
+          abfsConfig.getAccountName().substring(0,
+              abfsConfig.getAccountName().indexOf(DOT)),
+          abfsConfig.getStorageAccountKey()));
+    } else {
+      Field tokenProviderField = AbfsClient.class.getDeclaredField(
+          "tokenProvider");
+      tokenProviderField.setAccessible(true);
+      modifiersField.setInt(tokenProviderField,
+          tokenProviderField.getModifiers()
+              & ~java.lang.reflect.Modifier.FINAL);
+      tokenProviderField.set(client, abfsConfig.getTokenProvider());
+    }
+
+    // override user agent
+    String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
+        + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
+        + "UNKNOWN/UNKNOWN) MSFT";
+    Field userAgentField = AbfsClient.class.getDeclaredField(
+        "userAgent");
+    userAgentField.setAccessible(true);
+    modifiersField.setInt(userAgentField,
+        userAgentField.getModifiers()
+            & ~java.lang.reflect.Modifier.FINAL);
+    userAgentField.set(client, userAgent);
+
+    return client;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org