You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2021/08/26 15:24:27 UTC

[GitHub] [hadoop] steveloughran commented on a change in pull request #2549: HADOOP-17428. ABFS: Implementation for getContentSummary

steveloughran commented on a change in pull request #2549:
URL: https://github.com/apache/hadoop/pull/2549#discussion_r696704462



##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -433,6 +435,31 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
 
   }
 
+  /**
+   * Returns a ContentSummary instance containing the count of directories,
+   * files and total number of bytes under a given path
+   * @param path The given path
+   * @return ContentSummary
+   * @throws IOException if an error is encountered during listStatus calls
+   * or if there is any issue with the thread pool used while processing
+   */
+  @Override
+  public ContentSummary getContentSummary(Path path) throws IOException {
+    try {
+      TracingContext tracingContext = new TracingContext(clientCorrelationId,
+          fileSystemId, FSOperationType.GET_CONTENT_SUMMARY, true,
+          tracingHeaderFormat, listener);
+      return (new ContentSummaryProcessor(abfsStore)).getContentSummary(path,
+          tracingContext);
+    } catch (InterruptedException e) {
+      LOG.debug("Thread interrupted");
+      throw new IOException(e);

Review comment:
       InterruptedIOException

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+public class ContentSummaryProcessor {

Review comment:
       nit: javadocs

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -433,6 +435,31 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
 
   }
 
+  /**
+   * Returns a ContentSummary instance containing the count of directories,
+   * files and total number of bytes under a given path
+   * @param path The given path
+   * @return ContentSummary
+   * @throws IOException if an error is encountered during listStatus calls
+   * or if there is any issue with the thread pool used while processing
+   */
+  @Override
+  public ContentSummary getContentSummary(Path path) throws IOException {
+    try {
+      TracingContext tracingContext = new TracingContext(clientCorrelationId,
+          fileSystemId, FSOperationType.GET_CONTENT_SUMMARY, true,
+          tracingHeaderFormat, listener);
+      return (new ContentSummaryProcessor(abfsStore)).getContentSummary(path,
+          tracingContext);
+    } catch (InterruptedException e) {
+      LOG.debug("Thread interrupted");
+      throw new IOException(e);
+    } catch(ExecutionException ex) {
+      LOG.debug("GetContentSummary failed with error: {}", ex.getMessage());
+      throw new IOException(ex);

Review comment:
       prefer PathIOException with path included

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+public class ContentSummaryProcessor {
+  private static final int CORE_POOL_SIZE = 1;
+  private static final int MAX_THREAD_COUNT = 16;
+  private static final int KEEP_ALIVE_TIME = 5;
+  private static final int POLL_TIMEOUT = 100;
+  private static final Logger LOG = LoggerFactory.getLogger(ContentSummaryProcessor.class);
+  private final AtomicLong fileCount = new AtomicLong(0L);
+  private final AtomicLong directoryCount = new AtomicLong(0L);
+  private final AtomicLong totalBytes = new AtomicLong(0L);
+  private final AtomicInteger numTasks = new AtomicInteger(0);
+  private final ListingSupport abfsStore;
+  private final ExecutorService executorService = new ThreadPoolExecutor(
+      CORE_POOL_SIZE, MAX_THREAD_COUNT, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+      new SynchronousQueue<>());
+  private final CompletionService<Void> completionService =

Review comment:
       Abfs Store to create the executor so that all ops which do async IO (this, block uploads, prefetch, openFile async...) can share a single pool

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+public class ContentSummaryProcessor {
+  private static final int CORE_POOL_SIZE = 1;
+  private static final int MAX_THREAD_COUNT = 16;
+  private static final int KEEP_ALIVE_TIME = 5;
+  private static final int POLL_TIMEOUT = 100;
+  private static final Logger LOG = LoggerFactory.getLogger(ContentSummaryProcessor.class);
+  private final AtomicLong fileCount = new AtomicLong(0L);
+  private final AtomicLong directoryCount = new AtomicLong(0L);
+  private final AtomicLong totalBytes = new AtomicLong(0L);
+  private final AtomicInteger numTasks = new AtomicInteger(0);
+  private final ListingSupport abfsStore;
+  private final ExecutorService executorService = new ThreadPoolExecutor(
+      CORE_POOL_SIZE, MAX_THREAD_COUNT, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+      new SynchronousQueue<>());
+  private final CompletionService<Void> completionService =
+      new ExecutorCompletionService<>(executorService);
+  private final LinkedBlockingQueue<FileStatus> queue = new LinkedBlockingQueue<>();
+
+  /**
+   * Processes a given path for count of subdirectories, files and total number
+   * of bytes
+   * @param abfsStore Instance of AzureBlobFileSystemStore, used to make
+   * listStatus calls to server
+   */
+  public ContentSummaryProcessor(ListingSupport abfsStore) {
+    this.abfsStore = abfsStore;
+  }
+
+  public ContentSummary getContentSummary(Path path, TracingContext tracingContext)
+          throws IOException, ExecutionException, InterruptedException {
+    try {
+      processDirectoryTree(path, tracingContext);
+      while (!queue.isEmpty() || numTasks.get() > 0) {
+        try {
+          completionService.take().get();
+        } finally {
+          numTasks.decrementAndGet();
+          LOG.debug("FileStatus queue size = {}, number of submitted unfinished tasks = {}, active thread count = {}",
+              queue.size(), numTasks, ((ThreadPoolExecutor) executorService).getActiveCount());
+        }
+      }
+    } finally {
+      executorService.shutdownNow();
+      LOG.debug("Executor shutdown");
+    }
+    LOG.debug("Processed content summary of subtree under given path");
+    ContentSummary.Builder builder = new ContentSummary.Builder()
+        .directoryCount(directoryCount.get()).fileCount(fileCount.get())
+        .length(totalBytes.get()).spaceConsumed(totalBytes.get());
+    return builder.build();
+  }
+
+  /**
+   * Calls listStatus on given path and populated fileStatus queue with
+   * subdirectories. Is called by new tasks to process the complete subtree
+   * under a given path
+   * @param path: Path to a file or directory
+   * @throws IOException: listStatus error
+   * @throws InterruptedException: error while inserting into queue
+   */
+  private void processDirectoryTree(Path path, TracingContext tracingContext)
+      throws IOException, InterruptedException {
+    FileStatus[] fileStatuses = abfsStore.listStatus(path, tracingContext);
+
+    for (FileStatus fileStatus : fileStatuses) {

Review comment:
       
   If you supported paged results, you could start queuing subdir work while still iterating through the list.
   
   In directories with many pages of results including directories, this could speed up processing as subdirectory scanning could start as soon as of the first page of results had been retrieved.
   

##########
File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestGetContentSummary.java
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_LIST_MAX_RESULTS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class TestGetContentSummary extends AbstractAbfsIntegrationTest {
+
+  private static final int TEST_BUFFER_SIZE = 20;
+  private static final int FILES_PER_DIRECTORY = 2;
+  private static final int MAX_THREADS = 16;
+  private static final int NUM_FILES_FOR_LIST_MAX_TEST =
+      DEFAULT_AZURE_LIST_MAX_RESULTS + 10;
+private static final int NUM_CONCURRENT_CALLS = 8;
+
+  private final String[] directories = {"/testFolder",
+      "/testFolderII",
+      "/testFolder/testFolder1",
+      "/testFolder/testFolder2",
+      "/testFolder/testFolder3",
+      "/testFolder/testFolder2/testFolder4",
+      "/testFolder/testFolder2/testFolder5",
+      "/testFolder/testFolder3/testFolder6",
+      "/testFolder/testFolder3/testFolder7"};
+
+  private final byte[] b = new byte[TEST_BUFFER_SIZE];
+
+  public TestGetContentSummary() throws Exception {
+    new Random().nextBytes(b);
+  }
+
+  @Test
+  public void testFilesystemRoot()
+      throws IOException, ExecutionException, InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    createDirectoryStructure();
+    int fileCount = directories.length * FILES_PER_DIRECTORY;
+    ContentSummary contentSummary = fs.getContentSummary(new Path("/"));
+    verifyContentSummary(contentSummary, directories.length, fileCount,
+        directories.length * TEST_BUFFER_SIZE);
+  }
+
+  @Test
+  public void testFileContentSummary() throws IOException {
+    AzureBlobFileSystem fs = getFileSystem();
+    fs.mkdirs(new Path("/testFolder"));
+    Path filePath = new Path("/testFolder/testFile");
+    fs.create(filePath);
+    FSDataOutputStream out = fs.append(filePath);
+    out.write(b);
+    out.close();
+    ContentSummary contentSummary = fs.getContentSummary(filePath);
+    verifyContentSummary(contentSummary, 0, 1, TEST_BUFFER_SIZE);
+  }
+
+  @Test
+  public void testLeafDir() throws IOException {
+    AzureBlobFileSystem fs = getFileSystem();
+    fs.mkdirs(new Path("/testFolder"));
+    fs.mkdirs(new Path("/testFolder/testFolder1"));
+    fs.mkdirs(new Path("/testFolder/testFolder2"));
+    Path leafDir = new Path("/testFolder/testFolder1/testFolder3");
+    fs.mkdirs(leafDir);
+    ContentSummary contentSummary = fs.getContentSummary(leafDir);
+    verifyContentSummary(contentSummary, 0, 0, 0);
+  }
+
+  @Test
+  public void testIntermediateDirWithFilesOnly()
+      throws IOException, ExecutionException, InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    fs.mkdirs(new Path("/testFolder"));
+    Path intermediateDir = new Path("/testFolder/testFolder1");
+    fs.mkdirs(intermediateDir);
+    populateDirWithFiles(intermediateDir, FILES_PER_DIRECTORY);
+    ContentSummary contentSummary =
+        fs.getContentSummary(intermediateDir);
+    verifyContentSummary(contentSummary, 0, FILES_PER_DIRECTORY,
+        TEST_BUFFER_SIZE);
+  }
+
+  @Test
+  public void testIntermediateDirWithFilesAndSubdirs()
+      throws IOException, ExecutionException, InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    fs.mkdirs(new Path("/testFolder"));
+    Path intermediateDir = new Path("/testFolder/testFolder1");
+    fs.mkdirs(intermediateDir);
+    populateDirWithFiles(intermediateDir, FILES_PER_DIRECTORY);
+    fs.mkdirs(new Path("/testFolder/testFolder1/testFolder3"));
+    fs.registerListener(
+        new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
+            fs.getFileSystemId(), FSOperationType.GET_CONTENT_SUMMARY, true,
+            0));
+    ContentSummary contentSummary =
+        fs.getContentSummary(intermediateDir);
+    verifyContentSummary(contentSummary, 1, FILES_PER_DIRECTORY,
+        TEST_BUFFER_SIZE);
+  }
+
+  @Test
+  public void testDirOverListMaxResultsItems()
+      throws IOException, ExecutionException, InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    fs.mkdirs(new Path("/testFolder"));
+    Path pathToListMaxDir = new Path("/testFolder/listMaxDir");
+    fs.mkdirs(pathToListMaxDir);
+    fs.mkdirs(new Path(pathToListMaxDir + "/testFolder2"));
+    populateDirWithFiles(pathToListMaxDir, NUM_FILES_FOR_LIST_MAX_TEST);
+    verifyContentSummary(
+        fs.getContentSummary(pathToListMaxDir), 1,
+        NUM_FILES_FOR_LIST_MAX_TEST, TEST_BUFFER_SIZE);
+  }
+
+  @Test
+  public void testInvalidPath() throws Exception {
+    AzureBlobFileSystem fs = getFileSystem();
+    intercept(IOException.class, () -> fs.getContentSummary(new Path(
+        "/nonExistentPath")));
+  }
+
+  @Test
+  public void testConcurrentGetContentSummaryCalls()
+      throws InterruptedException, ExecutionException, IOException {
+    AzureBlobFileSystem fs = getFileSystem();
+    ExecutorService executorService = new ThreadPoolExecutor(1, MAX_THREADS, 5,
+        TimeUnit.SECONDS, new SynchronousQueue<>());
+    CompletionService<ContentSummary> completionService =
+        new ExecutorCompletionService<>(executorService);
+    createDirectoryStructure();
+    for (int i = 0; i < NUM_CONCURRENT_CALLS; i++) {
+      completionService.submit(() -> fs.getContentSummary(new Path(
+          "/testFolder")));
+    }
+    for (int i = 0; i < NUM_CONCURRENT_CALLS; i++) {
+      ContentSummary contentSummary = completionService.take().get();
+      verifyContentSummary(contentSummary, 7, 8 * FILES_PER_DIRECTORY,
+          8 * TEST_BUFFER_SIZE);
+    }
+    executorService.shutdown();
+  }
+
+  private void verifyContentSummary(ContentSummary contentSummary,
+      long expectedDirectoryCount, long expectedFileCount, long expectedByteCount) {
+    Assertions.assertThat(contentSummary.getDirectoryCount())
+        .describedAs("Incorrect directory count").isEqualTo(expectedDirectoryCount);
+    Assertions.assertThat(contentSummary.getFileCount())
+        .describedAs("Incorrect file count").isEqualTo(expectedFileCount);
+    Assertions.assertThat(contentSummary.getLength())
+        .describedAs("Incorrect length").isEqualTo(expectedByteCount);
+    Assertions.assertThat(contentSummary.getSpaceConsumed())
+        .describedAs("Incorrect value of space consumed").isEqualTo(expectedByteCount);
+  }
+
+  private void createDirectoryStructure()
+      throws IOException, ExecutionException, InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    for (String directory : directories) {
+      Path dirPath = new Path(directory);
+      fs.mkdirs(dirPath);
+      populateDirWithFiles(dirPath, FILES_PER_DIRECTORY);
+    }
+  }
+
+  private void populateDirWithFiles(Path directory, int numFiles)
+      throws ExecutionException, InterruptedException, IOException {
+    final List<Future<Void>> tasks = new ArrayList<>();
+    ExecutorService es = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < numFiles; i++) {
+      final Path fileName = new Path(directory + "/test" + i);

Review comment:
       change to
   ```
   new Path(directory, String.format("test-%04d", i));.
   ```
   
   ensures ordering of paths.
   
   

##########
File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestGetContentSummary.java
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_LIST_MAX_RESULTS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class TestGetContentSummary extends AbstractAbfsIntegrationTest {

Review comment:
       if this runs against a real store, name needs to begin as ITest
    
   to make this a unit test, look at how ITestAbfsListStatusRemoteIterator returns a mock listing API.
   
   you could simply generate a deep/wide tree of results for listings to find, either by creating a tree of entries to simulate a directory tree; or programmatically creating them (e.g declaring that each dir has 10 subdirs and 100 files & so returning them)
   

##########
File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestGetContentSummary.java
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_LIST_MAX_RESULTS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class TestGetContentSummary extends AbstractAbfsIntegrationTest {
+
+  private static final int TEST_BUFFER_SIZE = 20;
+  private static final int FILES_PER_DIRECTORY = 2;
+  private static final int MAX_THREADS = 16;
+  private static final int NUM_FILES_FOR_LIST_MAX_TEST =
+      DEFAULT_AZURE_LIST_MAX_RESULTS + 10;
+private static final int NUM_CONCURRENT_CALLS = 8;
+
+  private final String[] directories = {"/testFolder",
+      "/testFolderII",
+      "/testFolder/testFolder1",
+      "/testFolder/testFolder2",
+      "/testFolder/testFolder3",
+      "/testFolder/testFolder2/testFolder4",
+      "/testFolder/testFolder2/testFolder5",
+      "/testFolder/testFolder3/testFolder6",
+      "/testFolder/testFolder3/testFolder7"};
+
+  private final byte[] b = new byte[TEST_BUFFER_SIZE];
+
+  public TestGetContentSummary() throws Exception {
+    new Random().nextBytes(b);
+  }
+
+  @Test
+  public void testFilesystemRoot()
+      throws IOException, ExecutionException, InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    createDirectoryStructure();
+    int fileCount = directories.length * FILES_PER_DIRECTORY;
+    ContentSummary contentSummary = fs.getContentSummary(new Path("/"));
+    verifyContentSummary(contentSummary, directories.length, fileCount,
+        directories.length * TEST_BUFFER_SIZE);
+  }
+
+  @Test
+  public void testFileContentSummary() throws IOException {
+    AzureBlobFileSystem fs = getFileSystem();
+    fs.mkdirs(new Path("/testFolder"));
+    Path filePath = new Path("/testFolder/testFile");
+    fs.create(filePath);

Review comment:
       1. ContentTestUtils has helper methods here
   2. `fs.create()` returns a stream you can write to




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org