You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/03/01 11:48:51 UTC

[hadoop] branch trunk updated: HADOOP-18075. ABFS: Fix failure caused by listFiles() in ITestAbfsRestOperationException (#4040)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b56af00  HADOOP-18075. ABFS: Fix failure caused by listFiles() in ITestAbfsRestOperationException  (#4040)
b56af00 is described below

commit b56af00114c5647d647703ac9b0b491933903788
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Tue Mar 1 11:48:10 2022 +0000

    HADOOP-18075. ABFS: Fix failure caused by listFiles() in ITestAbfsRestOperationException  (#4040)
    
    
    Contributed by Sumangala Patki
---
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  8 ++---
 .../services/AbfsListStatusRemoteIterator.java     | 36 ++++++++++++++--------
 .../ITestAbfsListStatusRemoteIterator.java         | 34 +++++++++-----------
 .../azurebfs/ITestAbfsRestOperationException.java  |  5 +--
 4 files changed, 45 insertions(+), 38 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 4786614..ae70b8d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -1193,7 +1193,7 @@ public class AzureBlobFileSystem extends FileSystem
       TracingContext tracingContext = new TracingContext(clientCorrelationId,
           fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener);
       AbfsListStatusRemoteIterator abfsLsItr =
-          new AbfsListStatusRemoteIterator(getFileStatus(path, tracingContext), abfsStore,
+          new AbfsListStatusRemoteIterator(path, abfsStore,
               tracingContext);
       return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
     } else {
@@ -1368,9 +1368,9 @@ public class AzureBlobFileSystem extends FileSystem
    * @throws IOException if the exception error code is not on the allowed list.
    */
   @VisibleForTesting
-  static void checkException(final Path path,
-                              final AzureBlobFileSystemException exception,
-                              final AzureServiceErrorCode... allowedErrorCodesList) throws IOException {
+  public static void checkException(final Path path,
+      final AzureBlobFileSystemException exception,
+      final AzureServiceErrorCode... allowedErrorCodesList) throws IOException {
     if (exception instanceof AbfsRestOperationException) {
       AbfsRestOperationException ere = (AbfsRestOperationException) exception;
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index ce6207b..3fecb1f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -32,7 +32,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 
 public class AbfsListStatusRemoteIterator
@@ -45,7 +48,7 @@ public class AbfsListStatusRemoteIterator
   private static final int MAX_QUEUE_SIZE = 10;
   private static final long POLL_WAIT_TIME_IN_MS = 250;
 
-  private final FileStatus fileStatus;
+  private final Path path;
   private final ListingSupport listingSupport;
   private final ArrayBlockingQueue<AbfsListResult> listResultQueue;
   private final TracingContext tracingContext;
@@ -55,13 +58,15 @@ public class AbfsListStatusRemoteIterator
   private String continuation;
   private Iterator<FileStatus> currIterator;
 
-  public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
-      final ListingSupport listingSupport, TracingContext tracingContext) {
-    this.fileStatus = fileStatus;
+  public AbfsListStatusRemoteIterator(final Path path,
+      final ListingSupport listingSupport, TracingContext tracingContext)
+      throws IOException {
+    this.path = path;
     this.listingSupport = listingSupport;
     this.tracingContext = tracingContext;
     listResultQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
     currIterator = Collections.emptyIterator();
+    addNextBatchIteratorToQueue();
     fetchBatchesAsync();
   }
 
@@ -130,9 +135,6 @@ public class AbfsListStatusRemoteIterator
         Thread.currentThread().interrupt();
         LOG.error("Thread got interrupted: {}", interruptedException);
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.error("Thread got interrupted: {}", e);
     } finally {
       synchronized (this) {
         isAsyncInProgress = false;
@@ -141,13 +143,21 @@ public class AbfsListStatusRemoteIterator
   }
 
   private synchronized void addNextBatchIteratorToQueue()
-      throws IOException, InterruptedException {
+      throws IOException {
     List<FileStatus> fileStatuses = new ArrayList<>();
-    continuation = listingSupport
-        .listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
-            continuation, tracingContext);
-    if (!fileStatuses.isEmpty()) {
-      listResultQueue.put(new AbfsListResult(fileStatuses.iterator()));
+    try {
+      try {
+        continuation = listingSupport.listStatus(path, null, fileStatuses,
+            FETCH_ALL_FALSE, continuation, tracingContext);
+      } catch (AbfsRestOperationException ex) {
+        AzureBlobFileSystem.checkException(path, ex);
+      }
+      if (!fileStatuses.isEmpty()) {
+        listResultQueue.put(new AbfsListResult(fileStatuses.iterator()));
+      }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      LOG.error("Thread interrupted", ie);
     }
     if (continuation == null || continuation.isEmpty()) {
       isIterationComplete = true;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
index 3f50aec..ea1d0e2 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
@@ -68,10 +68,9 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
     setPageSize(10);
     final List<String> fileNames = createFilesUnderDirectory(testDir);
 
-    ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
-    RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
-        getFileSystem().getFileStatus(testDir), listngSupport,
-        getTestTracingContext(getFileSystem(), true));
+    ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore());
+    RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
+        listingSupport, getTestTracingContext(getFileSystem(), true));
     Assertions.assertThat(fsItr)
         .describedAs("RemoteIterator should be instance of "
             + "AbfsListStatusRemoteIterator by default")
@@ -84,7 +83,7 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
     }
     verifyIteratorResultCount(itrCount, fileNames);
     int minNumberOfInvocations = TEST_FILES_NUMBER / 10;
-    verify(listngSupport, Mockito.atLeast(minNumberOfInvocations))
+    verify(listingSupport, Mockito.atLeast(minNumberOfInvocations))
         .listStatus(any(Path.class), nullable(String.class),
             anyList(), anyBoolean(),
             nullable(String.class),
@@ -97,10 +96,9 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
     setPageSize(10);
     final List<String> fileNames = createFilesUnderDirectory(testDir);
 
-    ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
-    RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
-        getFileSystem().getFileStatus(testDir), listngSupport,
-        getTestTracingContext(getFileSystem(), true));
+    ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore());
+    RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
+        listingSupport, getTestTracingContext(getFileSystem(), true));
     Assertions.assertThat(fsItr)
         .describedAs("RemoteIterator should be instance of "
             + "AbfsListStatusRemoteIterator by default")
@@ -114,7 +112,7 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
     LambdaTestUtils.intercept(NoSuchElementException.class, fsItr::next);
     verifyIteratorResultCount(itrCount, fileNames);
     int minNumberOfInvocations = TEST_FILES_NUMBER / 10;
-    verify(listngSupport, Mockito.atLeast(minNumberOfInvocations))
+    verify(listingSupport, Mockito.atLeast(minNumberOfInvocations))
         .listStatus(any(Path.class), nullable(String.class),
             anyList(), anyBoolean(),
             nullable(String.class),
@@ -169,10 +167,9 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
   public void testNextWhenNoMoreElementsPresent() throws Exception {
     Path testDir = createTestDirectory();
     setPageSize(10);
-    RemoteIterator<FileStatus> fsItr =
-        new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
-            getFileSystem().getAbfsStore(),
-            getTestTracingContext(getFileSystem(), true));
+    RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
+        getFileSystem().getAbfsStore(),
+        getTestTracingContext(getFileSystem(), true));
     fsItr = Mockito.spy(fsItr);
     Mockito.doReturn(false).when(fsItr).hasNext();
 
@@ -212,12 +209,11 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
     getFileSystem().mkdirs(testDir);
 
     String exceptionMessage = "test exception";
-    ListingSupport lsSupport =getMockListingSupport(exceptionMessage);
-    RemoteIterator<FileStatus> fsItr =
-        new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
-        lsSupport, getTestTracingContext(getFileSystem(), true));
+    ListingSupport lsSupport = getMockListingSupport(exceptionMessage);
 
-    LambdaTestUtils.intercept(IOException.class, fsItr::next);
+    LambdaTestUtils.intercept(IOException.class,
+        () -> new AbfsListStatusRemoteIterator(testDir, lsSupport,
+            getTestTracingContext(getFileSystem(), true)));
   }
 
   @Test
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java
index a71e7bc..ce9415a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java
@@ -74,8 +74,9 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest
       // verify its format
       String errorMessage = ex.getLocalizedMessage();
       String[] errorFields = errorMessage.split(",");
-
-      Assert.assertEquals(6, errorFields.length);
+      Assertions.assertThat(errorFields)
+          .describedAs("fields in exception of %s", ex)
+          .hasSize(6);
       // Check status message, status code, HTTP Request Type and URL.
       Assert.assertEquals("Operation failed: \"The specified path does not exist.\"", errorFields[0].trim());
       Assert.assertEquals("404", errorFields[1].trim());

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org