You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fe...@apache.org on 2024/01/05 15:44:53 UTC

(arrow) branch main updated: GH-38772: [C++] Implement directory semantics even when the storage account doesn't support HNS (#39361)

This is an automated email from the ASF dual-hosted git repository.

felipecrv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new aae6fa40b4 GH-38772: [C++] Implement directory semantics even when the storage account doesn't support HNS (#39361)
aae6fa40b4 is described below

commit aae6fa40b458a90c598df281fdc8fc023e05a262
Author: Felipe Oliveira Carvalho <fe...@gmail.com>
AuthorDate: Fri Jan 5 12:44:45 2024 -0300

    GH-38772: [C++] Implement directory semantics even when the storage account doesn't support HNS (#39361)
    
    ### Rationale for this change
    
    The `FileSystem` implementation based on Azure Blob Storage should implement directory operations according to filesystem semantics. When Hierarchical Namespace (HNS) is enabled, we can rely on Azure Data Lake Storage Gen 2 APIs implementing the filesystem semantics for us, but when all we have is the Blobs API, we should emulate it.
    
    ### What changes are included in this PR?
    
     - Skip fewer tests
     - Re-implement `GetFileInfo` using `ListBlobsByHierarchy` instead of `ListBlobs`
     - Re-implement `CreateDir` with an upfront HNS support check instead of falling back to Blobs API after an error
     - Add comprehensive tests to `CreateDir`
     - Add `HasSubmitBatchBug` to check if a test inside any scenario is affected by a certain Azurite issue
     - Implement `DeleteDir` to work properly on flat namespace storage accounts (non-HNS accounts)
     -
    
    ### Are these changes tested?
    
    Yes. By existing and new tests added by this PR itself.
    * Closes: #38772
    
    Authored-by: Felipe Oliveira Carvalho <fe...@gmail.com>
    Signed-off-by: Felipe Oliveira Carvalho <fe...@gmail.com>
---
 cpp/src/arrow/filesystem/azurefs.cc         | 709 ++++++++++++++++++----------
 cpp/src/arrow/filesystem/azurefs_internal.h |   2 +-
 cpp/src/arrow/filesystem/azurefs_test.cc    | 444 ++++++++++-------
 3 files changed, 731 insertions(+), 424 deletions(-)

diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc
index 029e19bc0e..9569eff2e4 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -828,7 +828,7 @@ bool IsDfsEmulator(const AzureOptions& options) {
 namespace internal {
 
 Result<HNSSupport> CheckIfHierarchicalNamespaceIsEnabled(
-    DataLake::DataLakeFileSystemClient& adlfs_client, const AzureOptions& options) {
+    const DataLake::DataLakeFileSystemClient& adlfs_client, const AzureOptions& options) {
   try {
     auto directory_client = adlfs_client.GetDirectoryClient("");
     // GetAccessControlList will fail on storage accounts
@@ -891,10 +891,12 @@ namespace {
 
 const char kDelimiter[] = {internal::kSep, '\0'};
 
+/// \pre location.container is not empty.
 template <class ContainerClient>
-Result<FileInfo> GetContainerPropsAsFileInfo(const std::string& container_name,
-                                             ContainerClient& container_client) {
-  FileInfo info{container_name};
+Result<FileInfo> GetContainerPropsAsFileInfo(const AzureLocation& location,
+                                             const ContainerClient& container_client) {
+  DCHECK(!location.container.empty());
+  FileInfo info{location.path.empty() ? location.all : location.container};
   try {
     auto properties = container_client.GetProperties();
     info.set_type(FileType::Directory);
@@ -910,6 +912,18 @@ Result<FileInfo> GetContainerPropsAsFileInfo(const std::string& container_name,
   }
 }
 
+template <class ContainerClient>
+Status CreateContainerIfNotExists(const std::string& container_name,
+                                  const ContainerClient& container_client) {
+  try {
+    container_client.CreateIfNotExists();
+    return Status::OK();
+  } catch (const Storage::StorageException& exception) {
+    return ExceptionToStatus(exception, "Failed to create a container: ", container_name,
+                             ": ", container_client.GetUrl());
+  }
+}
+
 FileInfo DirectoryFileInfoFromPath(std::string_view path) {
   return FileInfo{std::string{internal::RemoveTrailingSlash(path)}, FileType::Directory};
 }
@@ -955,12 +969,21 @@ class AzureFileSystem::Impl {
   io::IOContext& io_context() { return io_context_; }
   const AzureOptions& options() const { return options_; }
 
- private:
+  Blobs::BlobContainerClient GetBlobContainerClient(const std::string& container_name) {
+    return blob_service_client_->GetBlobContainerClient(container_name);
+  }
+
+  /// \param container_name Also known as "filesystem" in the ADLS Gen2 API.
+  DataLake::DataLakeFileSystemClient GetFileSystemClient(
+      const std::string& container_name) {
+    return datalake_service_client_->GetFileSystemClient(container_name);
+  }
+
   /// \brief Memoized version of CheckIfHierarchicalNamespaceIsEnabled.
   ///
   /// \return kEnabled/kDisabled/kContainerNotFound (kUnknown is never returned).
   Result<HNSSupport> HierarchicalNamespaceSupport(
-      DataLake::DataLakeFileSystemClient& adlfs_client) {
+      const DataLake::DataLakeFileSystemClient& adlfs_client) {
     switch (cached_hns_support_) {
       case HNSSupport::kEnabled:
       case HNSSupport::kDisabled:
@@ -987,7 +1010,6 @@ class AzureFileSystem::Impl {
     return hns_support;
   }
 
- public:
   /// This is used from unit tests to ensure we perform operations on all the
   /// possible states of cached_hns_support_.
   void ForceCachedHierarchicalNamespaceSupport(int support) {
@@ -1004,33 +1026,20 @@ class AzureFileSystem::Impl {
     DCHECK(false) << "Invalid enum HierarchicalNamespaceSupport value.";
   }
 
-  Result<FileInfo> GetFileInfo(const AzureLocation& location) {
-    if (location.container.empty()) {
-      DCHECK(location.path.empty());
-      // Root directory of the storage account.
-      return FileInfo{"", FileType::Directory};
-    }
-    if (location.path.empty()) {
-      // We have a container, but no path within the container.
-      // The container itself represents a directory.
-      auto container_client =
-          blob_service_client_->GetBlobContainerClient(location.container);
-      return GetContainerPropsAsFileInfo(location.container, container_client);
-    }
-    // There is a path to search within the container.
-    FileInfo info{location.all};
-    auto adlfs_client = datalake_service_client_->GetFileSystemClient(location.container);
+  /// \pre location.path is not empty.
+  Result<FileInfo> GetFileInfo(const DataLake::DataLakeFileSystemClient& adlfs_client,
+                               const AzureLocation& location) {
     auto file_client = adlfs_client.GetFileClient(location.path);
     try {
+      FileInfo info{location.all};
       auto properties = file_client.GetProperties();
       if (properties.Value.IsDirectory) {
         info.set_type(FileType::Directory);
       } else if (internal::HasTrailingSlash(location.path)) {
-        // For a path with a trailing slash a hierarchical namespace may return a blob
-        // with that trailing slash removed. For consistency with flat namespace and
-        // other filesystems we chose to return NotFound.
-        //
-        // NOTE(felipecrv): could this be an empty directory marker?
+        // For a path with a trailing slash, a Hierarchical Namespace storage account
+        // may recognize a file (path with trailing slash removed). For consistency
+        // with other arrow::FileSystem implementations we chose to return NotFound
+        // because the trailing slash means the user was looking for a directory.
         info.set_type(FileType::NotFound);
         return info;
       } else {
@@ -1042,47 +1051,88 @@ class AzureFileSystem::Impl {
       return info;
     } catch (const Storage::StorageException& exception) {
       if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
-        ARROW_ASSIGN_OR_RAISE(auto hns_support,
-                              HierarchicalNamespaceSupport(adlfs_client));
-        if (hns_support == HNSSupport::kContainerNotFound ||
-            hns_support == HNSSupport::kEnabled) {
-          // If the hierarchical namespace is enabled, then the storage account will
-          // have explicit directories. Neither a file nor a directory was found.
-          info.set_type(FileType::NotFound);
+        return FileInfo{location.all, FileType::NotFound};
+      }
+      return ExceptionToStatus(
+          exception, "GetProperties for '", file_client.GetUrl(),
+          "' failed. GetFileInfo is unable to determine whether the path exists.");
+    }
+  }
+
+  /// On flat namespace accounts there are no real directories. Directories are
+  /// implied by empty directory marker blobs with names ending in "/" or there
+  /// being blobs with names starting with the directory path.
+  ///
+  /// \pre location.path is not empty.
+  Result<FileInfo> GetFileInfo(const Blobs::BlobContainerClient& container_client,
+                               const AzureLocation& location) {
+    DCHECK(!location.path.empty());
+    Blobs::ListBlobsOptions options;
+    options.Prefix = internal::RemoveTrailingSlash(location.path);
+    options.PageSizeHint = 1;
+
+    try {
+      FileInfo info{location.all};
+      auto list_response = container_client.ListBlobsByHierarchy(kDelimiter, options);
+      // Since PageSizeHint=1, we expect at most one entry in either Blobs or
+      // BlobPrefixes. A BlobPrefix always ends with kDelimiter ("/"), so we can
+      // distinguish between a directory and a file by checking if we received a
+      // prefix or a blob.
+      if (!list_response.BlobPrefixes.empty()) {
+        // Ensure the returned BlobPrefixes[0] string doesn't contain more characters than
+        // the requested Prefix. For instance, if we request with Prefix="dir/abra" and
+        // the container contains "dir/abracadabra/" but not "dir/abra/", we will get back
+        // "dir/abracadabra/" in the BlobPrefixes list. If "dir/abra/" existed,
+        // it would be returned instead because it comes before "dir/abracadabra/" in the
+        // lexicographic order guaranteed by ListBlobsByHierarchy.
+        const auto& blob_prefix = list_response.BlobPrefixes[0];
+        if (blob_prefix == internal::EnsureTrailingSlash(location.path)) {
+          info.set_type(FileType::Directory);
           return info;
         }
-        // On flat namespace accounts there are no real directories. Directories are only
-        // implied by using `/` in the blob name.
-        Blobs::ListBlobsOptions list_blob_options;
-        // If listing the prefix `path.path_to_file` with trailing slash returns at least
-        // one result then `path` refers to an implied directory.
-        list_blob_options.Prefix = internal::EnsureTrailingSlash(location.path);
-        // We only need to know if there is at least one result, so minimise page size
-        // for efficiency.
-        list_blob_options.PageSizeHint = 1;
-
-        try {
-          auto paged_list_result =
-              blob_service_client_->GetBlobContainerClient(location.container)
-                  .ListBlobs(list_blob_options);
-          auto file_type = paged_list_result.Blobs.size() > 0 ? FileType::Directory
-                                                              : FileType::NotFound;
-          info.set_type(file_type);
+      }
+      if (!list_response.Blobs.empty()) {
+        const auto& blob = list_response.Blobs[0];
+        if (blob.Name == location.path) {
+          info.set_type(FileType::File);
+          info.set_size(blob.BlobSize);
+          info.set_mtime(
+              std::chrono::system_clock::time_point{blob.Details.LastModified});
           return info;
-        } catch (const Storage::StorageException& exception) {
-          return ExceptionToStatus(
-              exception, "ListBlobs failed for prefix='", *list_blob_options.Prefix,
-              "' failed. GetFileInfo is unable to determine whether the path should "
-              "be considered an implied directory.");
         }
       }
+      info.set_type(FileType::NotFound);
+      return info;
+    } catch (const Storage::StorageException& exception) {
+      if (IsContainerNotFound(exception)) {
+        return FileInfo{location.all, FileType::NotFound};
+      }
       return ExceptionToStatus(
-          exception, "GetProperties failed for '", file_client.GetUrl(),
-          "' GetFileInfo is unable to determine whether the path exists.");
+          exception, "ListBlobsByHierarchy failed for prefix='", *options.Prefix,
+          "'. GetFileInfo is unable to determine whether the path exists.");
     }
   }
 
  private:
+  /// \pref location.container is not empty.
+  template <typename ContainerClient>
+  Status CheckDirExists(const ContainerClient& container_client,
+                        const AzureLocation& location) {
+    DCHECK(!location.container.empty());
+    FileInfo info;
+    if (location.path.empty()) {
+      ARROW_ASSIGN_OR_RAISE(info,
+                            GetContainerPropsAsFileInfo(location, container_client));
+    } else {
+      ARROW_ASSIGN_OR_RAISE(info, GetFileInfo(container_client, location));
+    }
+    if (info.type() == FileType::NotFound) {
+      return PathNotFound(location);
+    }
+    DCHECK_EQ(info.type(), FileType::Directory);
+    return Status::OK();
+  }
+
   template <typename OnContainer>
   Status VisitContainers(const Core::Context& context, OnContainer&& on_container) const {
     Blobs::ListBlobContainersOptions options;
@@ -1297,97 +1347,79 @@ class AzureFileSystem::Impl {
     return ptr;
   }
 
-  Status CreateDir(const AzureLocation& location) {
-    if (location.container.empty()) {
-      return Status::Invalid("CreateDir requires a non-empty path.");
-    }
-
-    auto container_client =
-        blob_service_client_->GetBlobContainerClient(location.container);
-    if (location.path.empty()) {
-      try {
-        auto response = container_client.Create();
-        return response.Value.Created
-                   ? Status::OK()
-                   : Status::AlreadyExists("Directory already exists: " + location.all);
-      } catch (const Storage::StorageException& exception) {
-        return ExceptionToStatus(exception,
-                                 "Failed to create a container: ", location.container,
-                                 ": ", container_client.GetUrl());
-      }
-    }
-
-    auto adlfs_client = datalake_service_client_->GetFileSystemClient(location.container);
-    ARROW_ASSIGN_OR_RAISE(auto hns_support, HierarchicalNamespaceSupport(adlfs_client));
-    if (hns_support == HNSSupport::kContainerNotFound) {
-      return PathNotFound(location);
-    }
-    if (hns_support == HNSSupport::kDisabled) {
-      ARROW_ASSIGN_OR_RAISE(
-          auto container_info,
-          GetContainerPropsAsFileInfo(location.container, container_client));
-      if (container_info.type() == FileType::NotFound) {
-        return PathNotFound(location);
-      }
-      // Without hierarchical namespace enabled Azure blob storage has no directories.
-      // Therefore we can't, and don't need to create one. Simply creating a blob with `/`
-      // in the name implies directories.
-      return Status::OK();
-    }
-
-    auto directory_client = adlfs_client.GetDirectoryClient(location.path);
-    try {
-      auto response = directory_client.Create();
-      if (response.Value.Created) {
-        return Status::OK();
-      } else {
-        return StatusFromErrorResponse(directory_client.GetUrl(), *response.RawResponse,
-                                       "Failed to create a directory: " + location.path);
+ private:
+  /// This function cannot assume the filesystem/container already exists.
+  ///
+  /// \pre location.container is not empty.
+  /// \pre location.path is not empty.
+  template <class ContainerClient, class CreateDirIfNotExists>
+  Status CreateDirTemplate(const ContainerClient& container_client,
+                           CreateDirIfNotExists&& create_if_not_exists,
+                           const AzureLocation& location, bool recursive) {
+    DCHECK(!location.container.empty());
+    DCHECK(!location.path.empty());
+    // Non-recursive CreateDir calls require the parent directory to exist.
+    if (!recursive) {
+      auto parent = location.parent();
+      if (!parent.path.empty()) {
+        RETURN_NOT_OK(CheckDirExists(container_client, parent));
       }
-    } catch (const Storage::StorageException& exception) {
-      return ExceptionToStatus(exception, "Failed to create a directory: ", location.path,
-                               ": ", directory_client.GetUrl());
+      // If the parent location is just the container, we don't need to check if it
+      // exists because the operation we perform below will fail if the container
+      // doesn't exist and we can handle that error according to the recursive flag.
     }
-  }
-
-  Status CreateDirRecursive(const AzureLocation& location) {
-    if (location.container.empty()) {
-      return Status::Invalid("CreateDir requires a non-empty path.");
-    }
-
-    auto container_client =
-        blob_service_client_->GetBlobContainerClient(location.container);
     try {
-      container_client.CreateIfNotExists();
-    } catch (const Storage::StorageException& exception) {
-      return ExceptionToStatus(exception,
-                               "Failed to create a container: ", location.container, " (",
-                               container_client.GetUrl(), ")");
-    }
-
-    auto adlfs_client = datalake_service_client_->GetFileSystemClient(location.container);
-    ARROW_ASSIGN_OR_RAISE(auto hns_support, HierarchicalNamespaceSupport(adlfs_client));
-    if (hns_support == HNSSupport::kDisabled) {
-      // Without hierarchical namespace enabled Azure blob storage has no directories.
-      // Therefore we can't, and don't need to create one. Simply creating a blob with `/`
-      // in the name implies directories.
+      create_if_not_exists(container_client, location);
       return Status::OK();
-    }
-    // Don't handle HNSSupport::kContainerNotFound, just assume it still exists (because
-    // it was created above) and try to create the directory.
-
-    if (!location.path.empty()) {
-      auto directory_client = adlfs_client.GetDirectoryClient(location.path);
-      try {
-        directory_client.CreateIfNotExists();
-      } catch (const Storage::StorageException& exception) {
-        return ExceptionToStatus(exception,
-                                 "Failed to create a directory: ", location.path, " (",
-                                 directory_client.GetUrl(), ")");
+    } catch (const Storage::StorageException& exception) {
+      if (IsContainerNotFound(exception)) {
+        try {
+          if (recursive) {
+            container_client.CreateIfNotExists();
+            create_if_not_exists(container_client, location);
+            return Status::OK();
+          } else {
+            auto parent = location.parent();
+            return PathNotFound(parent);
+          }
+        } catch (const Storage::StorageException& second_exception) {
+          return ExceptionToStatus(second_exception, "Failed to create directory '",
+                                   location.all, "': ", container_client.GetUrl());
+        }
       }
+      return ExceptionToStatus(exception, "Failed to create directory '", location.all,
+                               "': ", container_client.GetUrl());
     }
+  }
 
-    return Status::OK();
+ public:
+  /// This function cannot assume the filesystem already exists.
+  ///
+  /// \pre location.container is not empty.
+  /// \pre location.path is not empty.
+  Status CreateDirOnFileSystem(const DataLake::DataLakeFileSystemClient& adlfs_client,
+                               const AzureLocation& location, bool recursive) {
+    return CreateDirTemplate(
+        adlfs_client,
+        [](const auto& adlfs_client, const auto& location) {
+          auto directory_client = adlfs_client.GetDirectoryClient(location.path);
+          directory_client.CreateIfNotExists();
+        },
+        location, recursive);
+  }
+
+  /// This function cannot assume the container already exists.
+  ///
+  /// \pre location.container is not empty.
+  /// \pre location.path is not empty.
+  Status CreateDirOnContainer(const Blobs::BlobContainerClient& container_client,
+                              const AzureLocation& location, bool recursive) {
+    return CreateDirTemplate(
+        container_client,
+        [this](const auto& container_client, const auto& location) {
+          EnsureEmptyDirExistsImplThatThrows(container_client, location.path);
+        },
+        location, recursive);
   }
 
   Result<std::shared_ptr<ObjectAppendStream>> OpenAppendStream(
@@ -1414,10 +1446,92 @@ class AzureFileSystem::Impl {
   }
 
  private:
-  Status DeleteDirContentsWithoutHierarchicalNamespace(const AzureLocation& location,
-                                                       bool missing_dir_ok) {
-    auto container_client =
-        blob_service_client_->GetBlobContainerClient(location.container);
+  void EnsureEmptyDirExistsImplThatThrows(
+      const Blobs::BlobContainerClient& container_client,
+      const std::string& path_within_container) {
+    auto dir_marker_blob_path = internal::EnsureTrailingSlash(path_within_container);
+    auto block_blob_client =
+        container_client.GetBlobClient(dir_marker_blob_path).AsBlockBlobClient();
+    // Attach metadata that other filesystem implementations expect to be present
+    // on directory marker blobs.
+    // https://github.com/fsspec/adlfs/blob/32132c4094350fca2680155a5c236f2e9f991ba5/adlfs/spec.py#L855-L870
+    Blobs::UploadBlockBlobFromOptions blob_options;
+    blob_options.Metadata.emplace("is_directory", "true");
+    block_blob_client.UploadFrom(nullptr, 0, blob_options);
+  }
+
+ public:
+  /// This function assumes the container already exists. So it can only be
+  /// called after that has been verified.
+  ///
+  /// \pre location.container is not empty.
+  /// \pre The location.container container already exists.
+  Status EnsureEmptyDirExists(const Blobs::BlobContainerClient& container_client,
+                              const AzureLocation& location, const char* operation_name) {
+    DCHECK(!location.container.empty());
+    if (location.path.empty()) {
+      // Nothing to do. The container already exists per the preconditions.
+      return Status::OK();
+    }
+    try {
+      EnsureEmptyDirExistsImplThatThrows(container_client, location.path);
+      return Status::OK();
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(
+          exception, operation_name, " failed to ensure empty directory marker '",
+          location.path, "' exists in container: ", container_client.GetUrl());
+    }
+  }
+
+  /// \pre location.container is not empty.
+  /// \pre location.path is empty.
+  Status DeleteContainer(const Blobs::BlobContainerClient& container_client,
+                         const AzureLocation& location) {
+    DCHECK(!location.container.empty());
+    DCHECK(location.path.empty());
+    try {
+      auto response = container_client.Delete();
+      if (response.Value.Deleted) {
+        return Status::OK();
+      } else {
+        return StatusFromErrorResponse(
+            container_client.GetUrl(), *response.RawResponse,
+            "Failed to delete a container: " + location.container);
+      }
+    } catch (const Storage::StorageException& exception) {
+      if (IsContainerNotFound(exception)) {
+        return PathNotFound(location);
+      }
+      return ExceptionToStatus(exception,
+                               "Failed to delete a container: ", location.container, ": ",
+                               container_client.GetUrl());
+    }
+  }
+
+  /// Deletes contents of a directory and possibly the directory itself
+  /// depending on the value of preserve_dir_marker_blob.
+  ///
+  /// \pre location.container is not empty.
+  /// \pre preserve_dir_marker_blob=false implies location.path is not empty
+  /// because we can't *not preserve* the root directory of a container.
+  ///
+  /// \param require_dir_to_exist Require the directory to exist *before* this
+  /// operation, otherwise return PathNotFound.
+  /// \param preserve_dir_marker_blob Ensure the empty directory marker blob
+  /// is preserved (not deleted) or created (before the contents are deleted) if it
+  /// doesn't exist explicitly but is implied by the existence of blobs with names
+  /// starting with the directory path.
+  /// \param operation_name Used in error messages to accurately describe the operation
+  Status DeleteDirContentsOnContainer(const Blobs::BlobContainerClient& container_client,
+                                      const AzureLocation& location,
+                                      bool require_dir_to_exist,
+                                      bool preserve_dir_marker_blob,
+                                      const char* operation_name) {
+    using DeleteBlobResponse = Storage::DeferredResponse<Blobs::Models::DeleteBlobResult>;
+    DCHECK(!location.container.empty());
+    DCHECK(preserve_dir_marker_blob || !location.path.empty())
+        << "Must pass preserve_dir_marker_blob=true when location.path is empty "
+           "(i.e. deleting the contents of a container).";
     Blobs::ListBlobsOptions options;
     if (!location.path.empty()) {
       options.Prefix = internal::EnsureTrailingSlash(location.path);
@@ -1428,9 +1542,11 @@ class AzureFileSystem::Impl {
     // size of the body for a batch request can't exceed 4 MB.
     const int32_t kNumMaxRequestsInBatch = 256;
     options.PageSizeHint = kNumMaxRequestsInBatch;
+    // trusted only if preserve_dir_marker_blob is true.
+    bool found_dir_marker_blob = false;
     try {
       auto list_response = container_client.ListBlobs(options);
-      if (!missing_dir_ok && list_response.Blobs.empty()) {
+      if (require_dir_to_exist && list_response.Blobs.empty()) {
         return PathNotFound(location);
       }
       for (; list_response.HasPage(); list_response.MoveToNextPage()) {
@@ -1438,20 +1554,44 @@ class AzureFileSystem::Impl {
           continue;
         }
         auto batch = container_client.CreateBatch();
-        std::vector<Storage::DeferredResponse<Blobs::Models::DeleteBlobResult>>
-            deferred_responses;
+        std::vector<std::pair<std::string_view, DeleteBlobResponse>> deferred_responses;
         for (const auto& blob_item : list_response.Blobs) {
-          deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
+          if (preserve_dir_marker_blob && !found_dir_marker_blob) {
+            const bool is_dir_marker_blob =
+                options.Prefix.HasValue() && blob_item.Name == *options.Prefix;
+            if (is_dir_marker_blob) {
+              // Skip deletion of the existing directory marker blob,
+              // but take note that it exists.
+              found_dir_marker_blob = true;
+              continue;
+            }
+          }
+          deferred_responses.emplace_back(blob_item.Name,
+                                          batch.DeleteBlob(blob_item.Name));
         }
         try {
-          container_client.SubmitBatch(batch);
+          // Before submitting the batch deleting directory contents, ensure
+          // the empty directory marker blob exists. Doing this first, means that
+          // directory doesn't "stop existing" during the duration of the batch delete
+          // operation.
+          if (preserve_dir_marker_blob && !found_dir_marker_blob) {
+            // Only create an empty directory marker blob if the directory's
+            // existence is implied by the existence of blobs with names
+            // starting with the directory path.
+            if (!deferred_responses.empty()) {
+              RETURN_NOT_OK(
+                  EnsureEmptyDirExists(container_client, location, operation_name));
+            }
+          }
+          if (!deferred_responses.empty()) {
+            container_client.SubmitBatch(batch);
+          }
         } catch (const Storage::StorageException& exception) {
           return ExceptionToStatus(exception, "Failed to delete blobs in a directory: ",
                                    location.path, ": ", container_client.GetUrl());
         }
         std::vector<std::string> failed_blob_names;
-        for (size_t i = 0; i < deferred_responses.size(); ++i) {
-          const auto& deferred_response = deferred_responses[i];
+        for (auto& [blob_name_view, deferred_response] : deferred_responses) {
           bool success = true;
           try {
             auto delete_result = deferred_response.GetResponse();
@@ -1460,8 +1600,7 @@ class AzureFileSystem::Impl {
             success = false;
           }
           if (!success) {
-            const auto& blob_item = list_response.Blobs[i];
-            failed_blob_names.push_back(blob_item.Name);
+            failed_blob_names.emplace_back(blob_name_view);
           }
         }
         if (!failed_blob_names.empty()) {
@@ -1475,117 +1614,74 @@ class AzureFileSystem::Impl {
           }
         }
       }
+      return Status::OK();
     } catch (const Storage::StorageException& exception) {
       return ExceptionToStatus(exception,
                                "Failed to list blobs in a directory: ", location.path,
                                ": ", container_client.GetUrl());
     }
-    return Status::OK();
   }
 
- public:
-  Status DeleteDir(const AzureLocation& location) {
-    if (location.container.empty()) {
-      return Status::Invalid("DeleteDir requires a non-empty path.");
-    }
-
-    auto adlfs_client = datalake_service_client_->GetFileSystemClient(location.container);
-    ARROW_ASSIGN_OR_RAISE(auto hns_support, HierarchicalNamespaceSupport(adlfs_client));
-    if (hns_support == HNSSupport::kContainerNotFound) {
-      return PathNotFound(location);
-    }
-
-    if (location.path.empty()) {
-      auto container_client =
-          blob_service_client_->GetBlobContainerClient(location.container);
-      try {
-        auto response = container_client.Delete();
-        if (response.Value.Deleted) {
-          return Status::OK();
-        } else {
-          return StatusFromErrorResponse(
-              container_client.GetUrl(), *response.RawResponse,
-              "Failed to delete a container: " + location.container);
-        }
-      } catch (const Storage::StorageException& exception) {
-        return ExceptionToStatus(exception,
-                                 "Failed to delete a container: ", location.container,
-                                 ": ", container_client.GetUrl());
-      }
-    }
-
-    if (hns_support == HNSSupport::kEnabled) {
-      auto directory_client = adlfs_client.GetDirectoryClient(location.path);
-      try {
-        auto response = directory_client.DeleteRecursive();
-        if (response.Value.Deleted) {
-          return Status::OK();
-        } else {
-          return StatusFromErrorResponse(
-              directory_client.GetUrl(), *response.RawResponse,
-              "Failed to delete a directory: " + location.path);
-        }
-      } catch (const Storage::StorageException& exception) {
-        return ExceptionToStatus(exception,
-                                 "Failed to delete a directory: ", location.path, ": ",
-                                 directory_client.GetUrl());
+  /// \pre location.container is not empty.
+  /// \pre location.path is not empty.
+  Status DeleteDirOnFileSystem(const DataLake::DataLakeFileSystemClient& adlfs_client,
+                               const AzureLocation& location) {
+    DCHECK(!location.container.empty());
+    DCHECK(!location.path.empty());
+    auto directory_client = adlfs_client.GetDirectoryClient(location.path);
+    // XXX: should "directory not found" be considered an error?
+    try {
+      auto response = directory_client.DeleteRecursive();
+      if (response.Value.Deleted) {
+        return Status::OK();
+      } else {
+        return StatusFromErrorResponse(directory_client.GetUrl(), *response.RawResponse,
+                                       "Failed to delete a directory: " + location.path);
       }
-    } else {
-      return DeleteDirContentsWithoutHierarchicalNamespace(location,
-                                                           /*missing_dir_ok=*/true);
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(exception, "Failed to delete a directory: ", location.path,
+                               ": ", directory_client.GetUrl());
     }
   }
 
-  Status DeleteDirContents(const AzureLocation& location, bool missing_dir_ok) {
-    if (location.container.empty()) {
-      return internal::InvalidDeleteDirContents(location.all);
-    }
-
-    auto adlfs_client = datalake_service_client_->GetFileSystemClient(location.container);
-    ARROW_ASSIGN_OR_RAISE(auto hns_support, HierarchicalNamespaceSupport(adlfs_client));
-    if (hns_support == HNSSupport::kContainerNotFound) {
-      return missing_dir_ok ? Status::OK() : PathNotFound(location);
-    }
-
-    if (hns_support == HNSSupport::kEnabled) {
-      auto directory_client = adlfs_client.GetDirectoryClient(location.path);
-      try {
-        auto list_response = directory_client.ListPaths(false);
-        for (; list_response.HasPage(); list_response.MoveToNextPage()) {
-          for (const auto& path : list_response.Paths) {
-            if (path.IsDirectory) {
-              auto sub_directory_client = adlfs_client.GetDirectoryClient(path.Name);
-              try {
-                sub_directory_client.DeleteRecursive();
-              } catch (const Storage::StorageException& exception) {
-                return ExceptionToStatus(
-                    exception, "Failed to delete a sub directory: ", location.container,
-                    kDelimiter, path.Name, ": ", sub_directory_client.GetUrl());
-              }
-            } else {
-              auto sub_file_client = adlfs_client.GetFileClient(path.Name);
-              try {
-                sub_file_client.Delete();
-              } catch (const Storage::StorageException& exception) {
-                return ExceptionToStatus(
-                    exception, "Failed to delete a sub file: ", location.container,
-                    kDelimiter, path.Name, ": ", sub_file_client.GetUrl());
-              }
+  /// \pre location.container is not empty.
+  Status DeleteDirContentsOnFileSystem(
+      const DataLake::DataLakeFileSystemClient& adlfs_client,
+      const AzureLocation& location, bool missing_dir_ok) {
+    auto directory_client = adlfs_client.GetDirectoryClient(location.path);
+    try {
+      auto list_response = directory_client.ListPaths(false);
+      for (; list_response.HasPage(); list_response.MoveToNextPage()) {
+        for (const auto& path : list_response.Paths) {
+          if (path.IsDirectory) {
+            auto sub_directory_client = adlfs_client.GetDirectoryClient(path.Name);
+            try {
+              sub_directory_client.DeleteRecursive();
+            } catch (const Storage::StorageException& exception) {
+              return ExceptionToStatus(
+                  exception, "Failed to delete a sub directory: ", location.container,
+                  kDelimiter, path.Name, ": ", sub_directory_client.GetUrl());
+            }
+          } else {
+            auto sub_file_client = adlfs_client.GetFileClient(path.Name);
+            try {
+              sub_file_client.Delete();
+            } catch (const Storage::StorageException& exception) {
+              return ExceptionToStatus(
+                  exception, "Failed to delete a sub file: ", location.container,
+                  kDelimiter, path.Name, ": ", sub_file_client.GetUrl());
             }
           }
         }
-      } catch (const Storage::StorageException& exception) {
-        if (missing_dir_ok && exception.StatusCode == Http::HttpStatusCode::NotFound) {
-          return Status::OK();
-        } else {
-          return ExceptionToStatus(exception,
-                                   "Failed to delete directory contents: ", location.path,
-                                   ": ", directory_client.GetUrl());
-        }
       }
       return Status::OK();
-    } else {
-      return DeleteDirContentsWithoutHierarchicalNamespace(location, missing_dir_ok);
+    } catch (const Storage::StorageException& exception) {
+      if (missing_dir_ok && exception.StatusCode == Http::HttpStatusCode::NotFound) {
+        return Status::OK();
+      }
+      return ExceptionToStatus(exception,
+                               "Failed to delete directory contents: ", location.path,
+                               ": ", directory_client.GetUrl());
     }
   }
 
@@ -1640,7 +1736,30 @@ bool AzureFileSystem::Equals(const FileSystem& other) const {
 
 Result<FileInfo> AzureFileSystem::GetFileInfo(const std::string& path) {
   ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
-  return impl_->GetFileInfo(location);
+  if (location.container.empty()) {
+    DCHECK(location.path.empty());
+    // Root directory of the storage account.
+    return FileInfo{"", FileType::Directory};
+  }
+  if (location.path.empty()) {
+    // We have a container, but no path within the container.
+    // The container itself represents a directory.
+    auto container_client = impl_->GetBlobContainerClient(location.container);
+    return GetContainerPropsAsFileInfo(location, container_client);
+  }
+  // There is a path to search within the container. Check HNS support to proceed.
+  auto adlfs_client = impl_->GetFileSystemClient(location.container);
+  ARROW_ASSIGN_OR_RAISE(auto hns_support,
+                        impl_->HierarchicalNamespaceSupport(adlfs_client));
+  if (hns_support == HNSSupport::kContainerNotFound) {
+    return FileInfo{location.all, FileType::NotFound};
+  }
+  if (hns_support == HNSSupport::kEnabled) {
+    return impl_->GetFileInfo(adlfs_client, location);
+  }
+  DCHECK_EQ(hns_support, HNSSupport::kDisabled);
+  auto container_client = impl_->GetBlobContainerClient(location.container);
+  return impl_->GetFileInfo(container_client, location);
 }
 
 Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& select) {
@@ -1654,21 +1773,95 @@ Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& select)
 
 Status AzureFileSystem::CreateDir(const std::string& path, bool recursive) {
   ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
-  if (recursive) {
-    return impl_->CreateDirRecursive(location);
-  } else {
-    return impl_->CreateDir(location);
+  if (location.container.empty()) {
+    return Status::Invalid("CreateDir requires a non-empty path.");
   }
+
+  auto container_client = impl_->GetBlobContainerClient(location.container);
+  if (location.path.empty()) {
+    // If the path is just the container, the parent (root) trivially exists,
+    // and the CreateDir operation comes down to just creating the container.
+    return CreateContainerIfNotExists(location.container, container_client);
+  }
+
+  auto adlfs_client = impl_->GetFileSystemClient(location.container);
+  ARROW_ASSIGN_OR_RAISE(auto hns_support,
+                        impl_->HierarchicalNamespaceSupport(adlfs_client));
+  if (hns_support == HNSSupport::kContainerNotFound) {
+    if (!recursive) {
+      auto parent = location.parent();
+      return PathNotFound(parent);
+    }
+    RETURN_NOT_OK(CreateContainerIfNotExists(location.container, container_client));
+    // Perform a second check for HNS support after creating the container.
+    ARROW_ASSIGN_OR_RAISE(hns_support, impl_->HierarchicalNamespaceSupport(adlfs_client));
+    if (hns_support == HNSSupport::kContainerNotFound) {
+      // We only get kContainerNotFound if we are unable to read the properties of the
+      // container we just created. This is very unlikely, but theoretically possible in
+      // a concurrent system, so the error is handled to avoid infinite recursion.
+      return Status::IOError("Unable to read properties of a newly created container: ",
+                             location.container, ": " + container_client.GetUrl());
+    }
+  }
+  // CreateDirOnFileSystem and CreateDirOnContainer can handle the container
+  // not existing which is useful and necessary here since the only reason
+  // a container was created above was to check for HNS support when it wasn't
+  // cached yet.
+  if (hns_support == HNSSupport::kEnabled) {
+    return impl_->CreateDirOnFileSystem(adlfs_client, location, recursive);
+  }
+  DCHECK_EQ(hns_support, HNSSupport::kDisabled);
+  return impl_->CreateDirOnContainer(container_client, location, recursive);
 }
 
 Status AzureFileSystem::DeleteDir(const std::string& path) {
   ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
-  return impl_->DeleteDir(location);
+  if (location.container.empty()) {
+    return Status::Invalid("DeleteDir requires a non-empty path.");
+  }
+  if (location.path.empty()) {
+    auto container_client = impl_->GetBlobContainerClient(location.container);
+    return impl_->DeleteContainer(container_client, location);
+  }
+
+  auto adlfs_client = impl_->GetFileSystemClient(location.container);
+  ARROW_ASSIGN_OR_RAISE(auto hns_support,
+                        impl_->HierarchicalNamespaceSupport(adlfs_client));
+  if (hns_support == HNSSupport::kContainerNotFound) {
+    return PathNotFound(location);
+  }
+  if (hns_support == HNSSupport::kEnabled) {
+    return impl_->DeleteDirOnFileSystem(adlfs_client, location);
+  }
+  DCHECK_EQ(hns_support, HNSSupport::kDisabled);
+  auto container_client = impl_->GetBlobContainerClient(location.container);
+  return impl_->DeleteDirContentsOnContainer(container_client, location,
+                                             /*require_dir_to_exist=*/true,
+                                             /*preserve_dir_marker_blob=*/false,
+                                             "DeleteDir");
 }
 
 Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) {
   ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
-  return impl_->DeleteDirContents(location, missing_dir_ok);
+  if (location.container.empty()) {
+    return internal::InvalidDeleteDirContents(location.all);
+  }
+
+  auto adlfs_client = impl_->GetFileSystemClient(location.container);
+  ARROW_ASSIGN_OR_RAISE(auto hns_support,
+                        impl_->HierarchicalNamespaceSupport(adlfs_client));
+  if (hns_support == HNSSupport::kContainerNotFound) {
+    return missing_dir_ok ? Status::OK() : PathNotFound(location);
+  }
+
+  if (hns_support == HNSSupport::kEnabled) {
+    return impl_->DeleteDirContentsOnFileSystem(adlfs_client, location, missing_dir_ok);
+  }
+  auto container_client = impl_->GetBlobContainerClient(location.container);
+  return impl_->DeleteDirContentsOnContainer(container_client, location,
+                                             /*require_dir_to_exist=*/!missing_dir_ok,
+                                             /*preserve_dir_marker_blob=*/true,
+                                             "DeleteDirContents");
 }
 
 Status AzureFileSystem::DeleteRootDirContents() {
diff --git a/cpp/src/arrow/filesystem/azurefs_internal.h b/cpp/src/arrow/filesystem/azurefs_internal.h
index 13d84c9b54..5642e16bcf 100644
--- a/cpp/src/arrow/filesystem/azurefs_internal.h
+++ b/cpp/src/arrow/filesystem/azurefs_internal.h
@@ -71,7 +71,7 @@ enum class HierarchicalNamespaceSupport {
 /// \return kEnabled/kDisabled/kContainerNotFound (kUnknown is never
 /// returned).
 Result<HierarchicalNamespaceSupport> CheckIfHierarchicalNamespaceIsEnabled(
-    Azure::Storage::Files::DataLake::DataLakeFileSystemClient& adlfs_client,
+    const Azure::Storage::Files::DataLake::DataLakeFileSystemClient& adlfs_client,
     const arrow::fs::AzureOptions& options);
 
 }  // namespace internal
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc
index f6af9f722d..ff94578b04 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -473,6 +473,14 @@ class TestAzureFileSystem : public ::testing::Test {
     return blob_client;
   }
 
+  Blobs::Models::BlobProperties GetBlobProperties(const std::string& container_name,
+                                                  const std::string& blob_name) {
+    return blob_service_client_->GetBlobContainerClient(container_name)
+        .GetBlobClient(blob_name)
+        .GetProperties()
+        .Value;
+  }
+
   void UploadLines(const std::vector<std::string>& lines, const std::string& path,
                    int total_size) {
     ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
@@ -566,86 +574,259 @@ class TestAzureFileSystem : public ::testing::Test {
     return env->WithHierarchicalNamespace();
   }
 
+  constexpr static const char* const kSubmitBatchBugMessage =
+      "This test is affected by an Azurite issue: "
+      "https://github.com/Azure/Azurite/pull/2302";
+
+  /// Azurite has a bug that causes BlobContainerClient::SubmitBatch to fail on macOS.
+  /// SubmitBatch is used by:
+  ///  - AzureFileSystem::DeleteDir
+  ///  - AzureFileSystem::DeleteDirContents
+  bool HasSubmitBatchBug() const {
+#ifdef __APPLE__
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    return env->backend() == AzureBackend::kAzurite;
+#else
+    return false;
+#endif
+  }
+
   // Tests that are called from more than one implementation of TestAzureFileSystem
 
   void TestDetectHierarchicalNamespace(bool trip_up_azurite);
   void TestDetectHierarchicalNamespaceOnMissingContainer();
-  void TestGetFileInfoObject();
+
+  void TestGetFileInfoOfRoot() {
+    AssertFileInfo(fs(), "", FileType::Directory);
+
+    // URI
+    ASSERT_RAISES(Invalid, fs()->GetFileInfo("abfs://"));
+  }
+
+  void TestGetFileInfoOnExistingContainer() {
+    auto data = SetUpPreexistingData();
+    AssertFileInfo(fs(), data.container_name, FileType::Directory);
+    AssertFileInfo(fs(), data.container_name + "/", FileType::Directory);
+    auto props = GetBlobProperties(data.container_name, data.kObjectName);
+    AssertFileInfo(fs(), data.ObjectPath(), FileType::File,
+                   std::chrono::system_clock::time_point{props.LastModified},
+                   static_cast<int64_t>(props.BlobSize));
+    AssertFileInfo(fs(), data.NotFoundObjectPath(), FileType::NotFound);
+    AssertFileInfo(fs(), data.ObjectPath() + "/", FileType::NotFound);
+    AssertFileInfo(fs(), data.NotFoundObjectPath() + "/", FileType::NotFound);
+
+    // URIs
+    ASSERT_RAISES(Invalid, fs()->GetFileInfo("abfs://" + data.container_name));
+    ASSERT_RAISES(Invalid, fs()->GetFileInfo("abfs://" + std::string{data.kObjectName}));
+    ASSERT_RAISES(Invalid, fs()->GetFileInfo("abfs://" + data.ObjectPath()));
+  }
+
+  void TestGetFileInfoOnMissingContainer() {
+    auto data = SetUpPreexistingData();
+    AssertFileInfo(fs(), "nonexistent", FileType::NotFound);
+    AssertFileInfo(fs(), "nonexistent/object", FileType::NotFound);
+    AssertFileInfo(fs(), "nonexistent/object/", FileType::NotFound);
+  }
+
   void TestGetFileInfoObjectWithNestedStructure();
 
+  void TestCreateDirOnRoot() {
+    auto dir1 = PreexistingData::RandomContainerName(rng_);
+    auto dir2 = PreexistingData::RandomContainerName(rng_);
+
+    AssertFileInfo(fs(), dir1, FileType::NotFound);
+    ASSERT_OK(fs()->CreateDir(dir1, false));
+    AssertFileInfo(fs(), dir1, FileType::Directory);
+
+    AssertFileInfo(fs(), dir2, FileType::NotFound);
+    ASSERT_OK(fs()->CreateDir(dir2, true));
+    AssertFileInfo(fs(), dir1, FileType::Directory);
+
+    // Should not fail if the directory already exists.
+    ASSERT_OK(fs()->CreateDir(dir1, false));
+    ASSERT_OK(fs()->CreateDir(dir1, true));
+    AssertFileInfo(fs(), dir1, FileType::Directory);
+  }
+
+  void TestCreateDirOnExistingContainer() {
+    auto data = SetUpPreexistingData();
+    auto dir1 = data.RandomDirectoryPath(rng_);
+    auto dir2 = data.RandomDirectoryPath(rng_);
+
+    AssertFileInfo(fs(), dir1, FileType::NotFound);
+    ASSERT_OK(fs()->CreateDir(dir1, /*recursive=*/false));
+    AssertFileInfo(fs(), dir1, FileType::Directory);
+
+    AssertFileInfo(fs(), dir2, FileType::NotFound);
+    ASSERT_OK(fs()->CreateDir(dir2, /*recursive=*/true));
+    AssertFileInfo(fs(), dir2, FileType::Directory);
+
+    auto subdir1 = ConcatAbstractPath(dir1, "subdir");
+    auto subdir2 = ConcatAbstractPath(dir2, "subdir");
+    AssertFileInfo(fs(), subdir1, FileType::NotFound);
+    ASSERT_OK(fs()->CreateDir(subdir1, /*recursive=*/false));
+    AssertFileInfo(fs(), subdir1, FileType::Directory);
+    AssertFileInfo(fs(), subdir2, FileType::NotFound);
+    ASSERT_OK(fs()->CreateDir(subdir2, /*recursive=*/true));
+    AssertFileInfo(fs(), subdir2, FileType::Directory);
+
+    auto dir3 = data.RandomDirectoryPath(rng_);
+    AssertFileInfo(fs(), dir3, FileType::NotFound);
+    auto subdir3 = ConcatAbstractPath(dir3, "subdir");
+    AssertFileInfo(fs(), subdir3, FileType::NotFound);
+    // Creating subdir3 with recursive=false should fail.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, ::testing::HasSubstr("Path does not exist '" + dir3 + "'"),
+        fs()->CreateDir(subdir3, /*recursive=*/false));
+    AssertFileInfo(fs(), dir3, FileType::NotFound);
+    AssertFileInfo(fs(), subdir3, FileType::NotFound);
+    // Creating subdir3 with recursive=true should work.
+    ASSERT_OK(fs()->CreateDir(subdir3, /*recursive=*/true));
+    AssertFileInfo(fs(), dir3, FileType::Directory);
+    AssertFileInfo(fs(), subdir3, FileType::Directory);
+
+    auto dir4 = data.RandomDirectoryPath(rng_);
+    auto subdir4 = ConcatAbstractPath(dir4, "subdir4");
+    auto subdir5 = ConcatAbstractPath(dir4, "subdir4/subdir5");
+    // Creating subdir4 with recursive=false should fail.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, ::testing::HasSubstr("Path does not exist '" + dir4 + "'"),
+        fs()->CreateDir(subdir4, /*recursive=*/false));
+    AssertFileInfo(fs(), dir4, FileType::NotFound);
+    AssertFileInfo(fs(), subdir4, FileType::NotFound);
+    // Creating subdir5 with recursive=false should fail.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, ::testing::HasSubstr("Path does not exist '" + subdir4 + "'"),
+        fs()->CreateDir(subdir5, /*recursive=*/false));
+    AssertFileInfo(fs(), dir4, FileType::NotFound);
+    AssertFileInfo(fs(), subdir4, FileType::NotFound);
+    AssertFileInfo(fs(), subdir5, FileType::NotFound);
+    // Creating subdir5 with recursive=true should work.
+    ASSERT_OK(fs()->CreateDir(subdir5, /*recursive=*/true));
+    AssertFileInfo(fs(), dir4, FileType::Directory);
+    AssertFileInfo(fs(), subdir4, FileType::Directory);
+    AssertFileInfo(fs(), subdir5, FileType::Directory);
+  }
+
+  void TestCreateDirOnMissingContainer() {
+    auto container1 = PreexistingData::RandomContainerName(rng_);
+    auto container2 = PreexistingData::RandomContainerName(rng_);
+    AssertFileInfo(fs(), container1, FileType::NotFound);
+    AssertFileInfo(fs(), container2, FileType::NotFound);
+
+    auto dir1 = ConcatAbstractPath(container1, "dir");
+    AssertFileInfo(fs(), dir1, FileType::NotFound);
+    // Creating dir1 with recursive=false should fail.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, ::testing::HasSubstr("Path does not exist '" + container1 + "'"),
+        fs()->CreateDir(dir1, /*recursive=*/false));
+    AssertFileInfo(fs(), container1, FileType::NotFound);
+    AssertFileInfo(fs(), dir1, FileType::NotFound);
+    // Creating dir1 with recursive=true should work.
+    ASSERT_OK(fs()->CreateDir(dir1, /*recursive=*/true));
+    AssertFileInfo(fs(), container1, FileType::Directory);
+    AssertFileInfo(fs(), dir1, FileType::Directory);
+
+    auto dir2 = ConcatAbstractPath(container2, "dir");
+    auto subdir2 = ConcatAbstractPath(dir2, "subdir2");
+    auto subdir3 = ConcatAbstractPath(dir2, "subdir2/subdir3");
+    // Creating dir2 with recursive=false should fail.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, ::testing::HasSubstr("Path does not exist '" + container2 + "'"),
+        fs()->CreateDir(dir2, /*recursive=*/false));
+    AssertFileInfo(fs(), container2, FileType::NotFound);
+    AssertFileInfo(fs(), dir2, FileType::NotFound);
+    // Creating subdir2 with recursive=false should fail.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, ::testing::HasSubstr("Path does not exist '" + dir2 + "'"),
+        fs()->CreateDir(subdir2, /*recursive=*/false));
+    AssertFileInfo(fs(), container2, FileType::NotFound);
+    AssertFileInfo(fs(), dir2, FileType::NotFound);
+    AssertFileInfo(fs(), subdir2, FileType::NotFound);
+    // Creating subdir3 with recursive=false should fail.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, ::testing::HasSubstr("Path does not exist '" + subdir2 + "'"),
+        fs()->CreateDir(subdir3, /*recursive=*/false));
+    AssertFileInfo(fs(), container2, FileType::NotFound);
+    AssertFileInfo(fs(), dir2, FileType::NotFound);
+    AssertFileInfo(fs(), subdir2, FileType::NotFound);
+    AssertFileInfo(fs(), subdir3, FileType::NotFound);
+    // Creating subdir3 with recursive=true should work.
+    ASSERT_OK(fs()->CreateDir(subdir3, /*recursive=*/true));
+    AssertFileInfo(fs(), container2, FileType::Directory);
+    AssertFileInfo(fs(), dir2, FileType::Directory);
+    AssertFileInfo(fs(), subdir2, FileType::Directory);
+    AssertFileInfo(fs(), subdir3, FileType::Directory);
+  }
+
   void TestDeleteDirSuccessEmpty() {
+    if (HasSubmitBatchBug()) {
+      GTEST_SKIP() << kSubmitBatchBugMessage;
+    }
     auto data = SetUpPreexistingData();
     const auto directory_path = data.RandomDirectoryPath(rng_);
 
-    if (WithHierarchicalNamespace()) {
-      ASSERT_OK(fs()->CreateDir(directory_path, true));
-      AssertFileInfo(fs(), directory_path, FileType::Directory);
-      ASSERT_OK(fs()->DeleteDir(directory_path));
-      AssertFileInfo(fs(), directory_path, FileType::NotFound);
-    } else {
-      // There is only virtual directory without hierarchical namespace
-      // support. So the CreateDir() and DeleteDir() do nothing.
-      ASSERT_OK(fs()->CreateDir(directory_path));
-      AssertFileInfo(fs(), directory_path, FileType::NotFound);
-      ASSERT_OK(fs()->DeleteDir(directory_path));
-      AssertFileInfo(fs(), directory_path, FileType::NotFound);
-    }
+    AssertFileInfo(fs(), directory_path, FileType::NotFound);
+    ASSERT_OK(fs()->CreateDir(directory_path, true));
+    AssertFileInfo(fs(), directory_path, FileType::Directory);
+    ASSERT_OK(fs()->DeleteDir(directory_path));
+    AssertFileInfo(fs(), directory_path, FileType::NotFound);
   }
 
-  void TestCreateDirSuccessContainerAndDirectory() {
+  void TestDeleteDirFailureNonexistent() {
     auto data = SetUpPreexistingData();
     const auto path = data.RandomDirectoryPath(rng_);
-    ASSERT_OK(fs()->CreateDir(path, false));
-    if (WithHierarchicalNamespace()) {
-      AssertFileInfo(fs(), path, FileType::Directory);
-    } else {
-      // There is only virtual directory without hierarchical namespace
-      // support. So the CreateDir() does nothing.
-      AssertFileInfo(fs(), path, FileType::NotFound);
-    }
+    ASSERT_RAISES(IOError, fs()->DeleteDir(path));
   }
 
-  void TestCreateDirRecursiveSuccessContainerOnly() {
-    auto container_name = PreexistingData::RandomContainerName(rng_);
-    ASSERT_OK(fs()->CreateDir(container_name, true));
-    AssertFileInfo(fs(), container_name, FileType::Directory);
+  void TestDeleteDirSuccessHaveBlob() {
+    if (HasSubmitBatchBug()) {
+      GTEST_SKIP() << kSubmitBatchBugMessage;
+    }
+    auto data = SetUpPreexistingData();
+    const auto directory_path = data.RandomDirectoryPath(rng_);
+    const auto blob_path = ConcatAbstractPath(directory_path, "hello.txt");
+    ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(blob_path));
+    ASSERT_OK(output->Write("hello"));
+    ASSERT_OK(output->Close());
+    AssertFileInfo(fs(), blob_path, FileType::File);
+    ASSERT_OK(fs()->DeleteDir(directory_path));
+    AssertFileInfo(fs(), blob_path, FileType::NotFound);
   }
 
-  void TestCreateDirRecursiveSuccessDirectoryOnly() {
+  void TestDeleteDirSuccessHaveDirectory() {
+    if (HasSubmitBatchBug()) {
+      GTEST_SKIP() << kSubmitBatchBugMessage;
+    }
     auto data = SetUpPreexistingData();
     const auto parent = data.RandomDirectoryPath(rng_);
     const auto path = ConcatAbstractPath(parent, "new-sub");
     ASSERT_OK(fs()->CreateDir(path, true));
-    if (WithHierarchicalNamespace()) {
-      AssertFileInfo(fs(), path, FileType::Directory);
-      AssertFileInfo(fs(), parent, FileType::Directory);
-    } else {
-      // There is only virtual directory without hierarchical namespace
-      // support. So the CreateDir() does nothing.
-      AssertFileInfo(fs(), path, FileType::NotFound);
-      AssertFileInfo(fs(), parent, FileType::NotFound);
-    }
+    AssertFileInfo(fs(), path, FileType::Directory);
+    AssertFileInfo(fs(), parent, FileType::Directory);
+    ASSERT_OK(fs()->DeleteDir(parent));
+    AssertFileInfo(fs(), path, FileType::NotFound);
+    AssertFileInfo(fs(), parent, FileType::NotFound);
   }
 
-  void TestCreateDirRecursiveSuccessContainerAndDirectory() {
-    auto data = SetUpPreexistingData();
-    const auto parent = data.RandomDirectoryPath(rng_);
-    const auto path = ConcatAbstractPath(parent, "new-sub");
-    ASSERT_OK(fs()->CreateDir(path, true));
-    if (WithHierarchicalNamespace()) {
-      AssertFileInfo(fs(), path, FileType::Directory);
-      AssertFileInfo(fs(), parent, FileType::Directory);
-      AssertFileInfo(fs(), data.container_name, FileType::Directory);
-    } else {
-      // There is only virtual directory without hierarchical namespace
-      // support. So the CreateDir() does nothing.
-      AssertFileInfo(fs(), path, FileType::NotFound);
-      AssertFileInfo(fs(), parent, FileType::NotFound);
-      AssertFileInfo(fs(), data.container_name, FileType::Directory);
+  void TestDeleteDirContentsSuccessExist() {
+    if (HasSubmitBatchBug()) {
+      GTEST_SKIP() << kSubmitBatchBugMessage;
+    }
+    auto preexisting_data = SetUpPreexistingData();
+    HierarchicalPaths paths;
+    CreateHierarchicalData(&paths);
+    ASSERT_OK(fs()->DeleteDirContents(paths.directory));
+    AssertFileInfo(fs(), paths.directory, FileType::Directory);
+    for (const auto& sub_path : paths.sub_paths) {
+      AssertFileInfo(fs(), sub_path, FileType::NotFound);
     }
   }
 
   void TestDeleteDirContentsSuccessNonexistent() {
+    if (HasSubmitBatchBug()) {
+      GTEST_SKIP() << kSubmitBatchBugMessage;
+    }
     auto data = SetUpPreexistingData();
     const auto directory_path = data.RandomDirectoryPath(rng_);
     ASSERT_OK(fs()->DeleteDirContents(directory_path, true));
@@ -662,7 +843,7 @@ class TestAzureFileSystem : public ::testing::Test {
 void TestAzureFileSystem::TestDetectHierarchicalNamespace(bool trip_up_azurite) {
   EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
   if (trip_up_azurite && env->backend() != AzureBackend::kAzurite) {
-    GTEST_SKIP() << "trip_up_azurite=true is only for Azurite.";
+    return;
   }
 
   auto data = SetUpPreexistingData();
@@ -704,22 +885,6 @@ void TestAzureFileSystem::TestDetectHierarchicalNamespaceOnMissingContainer() {
   }
 }
 
-void TestAzureFileSystem::TestGetFileInfoObject() {
-  auto data = SetUpPreexistingData();
-  auto object_properties =
-      blob_service_client_->GetBlobContainerClient(data.container_name)
-          .GetBlobClient(data.kObjectName)
-          .GetProperties()
-          .Value;
-
-  AssertFileInfo(fs(), data.ObjectPath(), FileType::File,
-                 std::chrono::system_clock::time_point{object_properties.LastModified},
-                 static_cast<int64_t>(object_properties.BlobSize));
-
-  // URI
-  ASSERT_RAISES(Invalid, fs()->GetFileInfo("abfs://" + std::string{data.kObjectName}));
-}
-
 void TestAzureFileSystem::TestGetFileInfoObjectWithNestedStructure() {
   auto data = SetUpPreexistingData();
   // Adds detailed tests to handle cases of different edge cases
@@ -855,6 +1020,16 @@ TYPED_TEST(TestAzureFileSystemOnAllEnvs, DetectHierarchicalNamespaceOnMissingCon
   this->TestDetectHierarchicalNamespaceOnMissingContainer();
 }
 
+TYPED_TEST(TestAzureFileSystemOnAllEnvs, GetFileInfoOfRoot) {
+  this->TestGetFileInfoOfRoot();
+}
+
+TYPED_TEST(TestAzureFileSystemOnAllEnvs, CreateDirWithEmptyPath) {
+  ASSERT_RAISES(Invalid, this->fs()->CreateDir("", false));
+}
+
+TYPED_TEST(TestAzureFileSystemOnAllEnvs, CreateDirOnRoot) { this->TestCreateDirOnRoot(); }
+
 // Tests using all the 3 environments (Azurite, Azure w/o HNS (flat), Azure w/ HNS)
 // combined with the two scenarios for AzureFileSystem::cached_hns_support_ -- unknown and
 // known according to the environment.
@@ -869,105 +1044,56 @@ using AllScenarios = ::testing::Types<
 
 TYPED_TEST_SUITE(TestAzureFileSystemOnAllScenarios, AllScenarios);
 
-TYPED_TEST(TestAzureFileSystemOnAllScenarios, GetFileInfoObject) {
-  this->TestGetFileInfoObject();
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, GetFileInfoOnExistingContainer) {
+  this->TestGetFileInfoOnExistingContainer();
 }
 
-TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) {
-  this->TestDeleteDirSuccessEmpty();
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, GetFileInfoOnMissingContainer) {
+  this->TestGetFileInfoOnMissingContainer();
 }
 
 TYPED_TEST(TestAzureFileSystemOnAllScenarios, GetFileInfoObjectWithNestedStructure) {
   this->TestGetFileInfoObjectWithNestedStructure();
 }
 
-TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirSuccessContainerAndDirectory) {
-  this->TestCreateDirSuccessContainerAndDirectory();
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirOnExistingContainer) {
+  this->TestCreateDirOnExistingContainer();
 }
 
-TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirRecursiveSuccessContainerOnly) {
-  this->TestCreateDirRecursiveSuccessContainerOnly();
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirOnMissingContainer) {
+  this->TestCreateDirOnMissingContainer();
 }
 
-TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirRecursiveSuccessDirectoryOnly) {
-  this->TestCreateDirRecursiveSuccessDirectoryOnly();
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) {
+  this->TestDeleteDirSuccessEmpty();
 }
 
-TYPED_TEST(TestAzureFileSystemOnAllScenarios,
-           CreateDirRecursiveSuccessContainerAndDirectory) {
-  this->TestCreateDirRecursiveSuccessContainerAndDirectory();
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirFailureNonexistent) {
+  this->TestDeleteDirFailureNonexistent();
 }
 
-// Tests using a real storage account *with Hierarchical Namespace enabled*
-
-TEST_F(TestAzureHierarchicalNSFileSystem, DeleteDirFailureNonexistent) {
-  auto data = SetUpPreexistingData();
-  const auto path = data.RandomDirectoryPath(rng_);
-  ASSERT_RAISES(IOError, fs()->DeleteDir(path));
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessHaveBlob) {
+  this->TestDeleteDirSuccessHaveBlob();
 }
 
-TEST_F(TestAzureHierarchicalNSFileSystem, DeleteDirSuccessHaveBlob) {
-  auto data = SetUpPreexistingData();
-  const auto directory_path = data.RandomDirectoryPath(rng_);
-  const auto blob_path = ConcatAbstractPath(directory_path, "hello.txt");
-  ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(blob_path));
-  ASSERT_OK(output->Write(std::string_view("hello")));
-  ASSERT_OK(output->Close());
-  AssertFileInfo(fs(), blob_path, FileType::File);
-  ASSERT_OK(fs()->DeleteDir(directory_path));
-  AssertFileInfo(fs(), blob_path, FileType::NotFound);
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessHaveDirectory) {
+  this->TestDeleteDirSuccessHaveDirectory();
 }
 
-TEST_F(TestAzureHierarchicalNSFileSystem, DeleteDirSuccessHaveDirectory) {
-  auto data = SetUpPreexistingData();
-  const auto parent = data.RandomDirectoryPath(rng_);
-  const auto path = ConcatAbstractPath(parent, "new-sub");
-  ASSERT_OK(fs()->CreateDir(path, true));
-  AssertFileInfo(fs(), path, FileType::Directory);
-  AssertFileInfo(fs(), parent, FileType::Directory);
-  ASSERT_OK(fs()->DeleteDir(parent));
-  AssertFileInfo(fs(), path, FileType::NotFound);
-  AssertFileInfo(fs(), parent, FileType::NotFound);
-}
-
-TEST_F(TestAzureHierarchicalNSFileSystem, DeleteDirContentsSuccessExist) {
-  auto preexisting_data = SetUpPreexistingData();
-  HierarchicalPaths paths;
-  CreateHierarchicalData(&paths);
-  ASSERT_OK(fs()->DeleteDirContents(paths.directory));
-  AssertFileInfo(fs(), paths.directory, FileType::Directory);
-  for (const auto& sub_path : paths.sub_paths) {
-    AssertFileInfo(fs(), sub_path, FileType::NotFound);
-  }
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirContentsSuccessExist) {
+  this->TestDeleteDirContentsSuccessExist();
 }
 
-TEST_F(TestAzureHierarchicalNSFileSystem, DeleteDirContentsSuccessNonexistent) {
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirContentsSuccessNonexistent) {
   this->TestDeleteDirContentsSuccessNonexistent();
 }
 
-TEST_F(TestAzureHierarchicalNSFileSystem, DeleteDirContentsFailureNonexistent) {
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirContentsFailureNonexistent) {
   this->TestDeleteDirContentsFailureNonexistent();
 }
 
 // Tests using Azurite (the local Azure emulator)
 
-TEST_F(TestAzuriteFileSystem, GetFileInfoAccount) {
-  AssertFileInfo(fs(), "", FileType::Directory);
-
-  // URI
-  ASSERT_RAISES(Invalid, fs()->GetFileInfo("abfs://"));
-}
-
-TEST_F(TestAzuriteFileSystem, GetFileInfoContainer) {
-  auto data = SetUpPreexistingData();
-  AssertFileInfo(fs(), data.container_name, FileType::Directory);
-
-  AssertFileInfo(fs(), "nonexistent-container", FileType::NotFound);
-
-  // URI
-  ASSERT_RAISES(Invalid, fs()->GetFileInfo("abfs://" + data.container_name));
-}
-
 TEST_F(TestAzuriteFileSystem, GetFileInfoSelector) {
   SetUpSmallFileSystemTree();
 
@@ -1141,16 +1267,6 @@ TEST_F(TestAzuriteFileSystem, GetFileInfoSelectorExplicitImplicitDirDedup) {
   AssertFileInfo(infos[0], "container/mydir/nonemptydir2/somefile", FileType::File);
 }
 
-TEST_F(TestAzuriteFileSystem, CreateDirFailureNoContainer) {
-  ASSERT_RAISES(Invalid, fs()->CreateDir("", false));
-}
-
-TEST_F(TestAzuriteFileSystem, CreateDirSuccessContainerOnly) {
-  auto container_name = PreexistingData::RandomContainerName(rng_);
-  ASSERT_OK(fs()->CreateDir(container_name, false));
-  AssertFileInfo(fs(), container_name, FileType::Directory);
-}
-
 TEST_F(TestAzuriteFileSystem, CreateDirFailureDirectoryWithMissingContainer) {
   const auto path = std::string("not-a-container/new-directory");
   ASSERT_RAISES(IOError, fs()->CreateDir(path, false));
@@ -1175,19 +1291,20 @@ TEST_F(TestAzuriteFileSystem, DeleteDirSuccessContainer) {
 }
 
 TEST_F(TestAzuriteFileSystem, DeleteDirSuccessNonexistent) {
+  if (HasSubmitBatchBug()) {
+    GTEST_SKIP() << kSubmitBatchBugMessage;
+  }
   auto data = SetUpPreexistingData();
   const auto directory_path = data.RandomDirectoryPath(rng_);
-  // There is only virtual directory without hierarchical namespace
-  // support. So the DeleteDir() for nonexistent directory does nothing.
-  ASSERT_OK(fs()->DeleteDir(directory_path));
+  // DeleteDir() fails if the directory doesn't exist.
+  ASSERT_RAISES(IOError, fs()->DeleteDir(directory_path));
   AssertFileInfo(fs(), directory_path, FileType::NotFound);
 }
 
 TEST_F(TestAzuriteFileSystem, DeleteDirSuccessHaveBlobs) {
-#ifdef __APPLE__
-  GTEST_SKIP() << "This test fails by an Azurite problem: "
-                  "https://github.com/Azure/Azurite/pull/2302";
-#endif
+  if (HasSubmitBatchBug()) {
+    GTEST_SKIP() << kSubmitBatchBugMessage;
+  }
   auto data = SetUpPreexistingData();
   const auto directory_path = data.RandomDirectoryPath(rng_);
   // We must use 257 or more blobs here to test pagination of ListBlobs().
@@ -1213,10 +1330,9 @@ TEST_F(TestAzuriteFileSystem, DeleteDirUri) {
 }
 
 TEST_F(TestAzuriteFileSystem, DeleteDirContentsSuccessContainer) {
-#ifdef __APPLE__
-  GTEST_SKIP() << "This test fails by an Azurite problem: "
-                  "https://github.com/Azure/Azurite/pull/2302";
-#endif
+  if (HasSubmitBatchBug()) {
+    GTEST_SKIP() << kSubmitBatchBugMessage;
+  }
   auto data = SetUpPreexistingData();
   HierarchicalPaths paths;
   CreateHierarchicalData(&paths);
@@ -1229,16 +1345,14 @@ TEST_F(TestAzuriteFileSystem, DeleteDirContentsSuccessContainer) {
 }
 
 TEST_F(TestAzuriteFileSystem, DeleteDirContentsSuccessDirectory) {
-#ifdef __APPLE__
-  GTEST_SKIP() << "This test fails by an Azurite problem: "
-                  "https://github.com/Azure/Azurite/pull/2302";
-#endif
+  if (HasSubmitBatchBug()) {
+    GTEST_SKIP() << kSubmitBatchBugMessage;
+  }
   auto data = SetUpPreexistingData();
   HierarchicalPaths paths;
   CreateHierarchicalData(&paths);
   ASSERT_OK(fs()->DeleteDirContents(paths.directory));
-  // GH-38772: We may change this to FileType::Directory.
-  AssertFileInfo(fs(), paths.directory, FileType::NotFound);
+  AssertFileInfo(fs(), paths.directory, FileType::Directory);
   for (const auto& sub_path : paths.sub_paths) {
     AssertFileInfo(fs(), sub_path, FileType::NotFound);
   }