You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "pitrou (via GitHub)" <gi...@apache.org> on 2023/06/26 10:07:41 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #35440: GH-34213: [C++] Use recursive calls without a delimiter if the user is doing a recursive GetFileInfo

pitrou commented on code in PR #35440:
URL: https://github.com/apache/arrow/pull/35440#discussion_r1241889516


##########
cpp/src/arrow/filesystem/path_util.cc:
##########
@@ -66,6 +67,28 @@ std::vector<std::string> SplitAbstractPath(const std::string& path, char sep) {
   return parts;
 }
 
+std::string SliceAbstractPath(const std::string& s, int offset, int length, char sep) {
+  if (offset < 0 || length < 0) {
+    return "";
+  }
+  std::vector<std::string> components = SplitAbstractPath(s, sep);
+  std::stringstream combined;
+  if (offset >= static_cast<int>(components.size())) {
+    return "";
+  }
+  int end = length;

Review Comment:
   Why not use  `length` directly?



##########
cpp/src/arrow/filesystem/path_util.h:
##########
@@ -38,9 +38,17 @@ constexpr char kSep = '/';
 ARROW_EXPORT
 std::vector<std::string> SplitAbstractPath(const std::string& path, char sep = kSep);
 
-// Return the extension of the file
+// Slice the individual components of an abstract path and combine them
+//
+// If offset or length are negative then an empty string is returned
+// If offset is >= the number of components then an empty string is returned
+// If offset + length is >= the number of components then length is truncated
 ARROW_EXPORT
-std::string GetAbstractPathExtension(const std::string& s);
+std::string SliceAbstractPath(const std::string& path, int offset, int length,

Review Comment:
   Can you add basic tests for this in `filesystem_test.cc`?



##########
cpp/src/arrow/util/async_util_test.cc:
##########
@@ -204,6 +204,29 @@ TEST(AsyncTaskScheduler, InitialTaskFails) {
   ASSERT_FINISHES_AND_RAISES(Invalid, finished);
 }
 
+TEST(AsyncTaskScheduler, TaskDestroyedBeforeSchedulerEnds) {
+  bool my_task_destroyed = false;

Review Comment:
   Should it be an atomic?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {

Review Comment:
   Either take a const-ref, or move the value below?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -2116,28 +2083,86 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
     return DeleteObjectsAsync(bucket, keys).status();
   }
 
+  Future<> EnsureNotFileAsync(const std::string& bucket, const std::string& key) {
+    if (key.empty()) {
+      // There is no way for a bucket to be a file
+      return Future<>::MakeFinished();

Review Comment:
   But this will also run the future continuation on this thread rather on the IO thread pool?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -2285,6 +2301,11 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
 
     auto outcome = impl_->client_->HeadObject(req);
     if (outcome.IsSuccess()) {
+      bool ends_in_slash = s[s.size() - 1] == '/';
+      if (outcome.GetResult().GetContentLength() == 0 && ends_in_slash) {
+        info.set_type(FileType::Directory);
+        return info;
+      }

Review Comment:
   Why is this change necessary?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -2344,42 +2352,28 @@ FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select)
   }
   auto base_path = *std::move(maybe_base_path);
 
-  if (base_path.empty()) {
-    // List all buckets, then possibly recurse
-    PushGenerator<AsyncGenerator<FileInfoVector>> gen;
-    auto producer = gen.producer();
-
-    auto fut = impl_->ListBucketsAsync(io_context());
-    auto impl = impl_->shared_from_this();
-    fut.AddCallback(
-        [producer, select, impl](const Result<std::vector<std::string>>& res) mutable {
-          if (!res.ok()) {
-            producer.Push(res.status());
-            producer.Close();
-            return;
-          }
-          FileInfoVector buckets;
-          for (const auto& bucket : *res) {
-            buckets.push_back(FileInfo{bucket, FileType::Directory});
-          }
-          // Generate all bucket infos
-          auto buckets_fut = Future<FileInfoVector>::MakeFinished(std::move(buckets));
-          producer.Push(MakeSingleFutureGenerator(buckets_fut));
-          if (select.recursive) {
-            // Generate recursive walk for each bucket in turn
-            for (const auto& bucket : *buckets_fut.result()) {
-              producer.Push(impl->WalkAsync(select, bucket.path(), ""));
-            }
-          }
-          producer.Close();
-        });
+  PushGenerator<std::vector<FileInfo>> generator;
+  auto start_listing = [&, sink = generator.producer()](
+                           util::AsyncTaskScheduler* scheduler) {
+    if (base_path.empty()) {
+      bool should_recurse = select.recursive && select.max_recursion > 0;
+      impl_->FullListAsync(/*include_virtual=*/true, scheduler, sink, io_context(),
+                           should_recurse);
+    } else {
+      impl_->ListAsync(select, base_path.bucket, base_path.key,
+                       /*include_virtual=*/true, scheduler, sink, /*close_sink=*/false);
+    }
+    return Status::OK();
+  };
 
-    return MakeConcatenatedGenerator(
-        AsyncGenerator<AsyncGenerator<FileInfoVector>>{std::move(gen)});
-  }
+  Future<> all_done_fut = util::AsyncTaskScheduler::Make(
+      std::move(start_listing), [](const Status&) {}, StopToken::Unstoppable());

Review Comment:
   ```suggestion
     Future<> all_done_fut = util::AsyncTaskScheduler::Make(
         std::move(start_listing), /*abort_callback=*/ [](const Status&) {}, StopToken::Unstoppable());
   ```



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;
+    int max_recursion;
+    bool include_virtual;
+    S3Model::ListObjectsV2Request req;
+    io::IOContext io_context;
+    std::shared_ptr<Aws::S3::S3Client> client;
+    bool close_sink;
+    bool no_files_means_not_found;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_virtual,
+                    io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client,
+                    bool close_sink)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_virtual(include_virtual),
+          io_context(io_context),
+          client(std::move(client)),
+          close_sink(close_sink),
+          no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
+      }
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
+      }
+    }
+
+    // The FileListerState is kept alive by the various FileListerTasks.  Once all the
+    // tasks are finished this will be destroyed and we can run some cleanup
+    ~FileListerState() {
+      // * If the bucket doesn't exist we will have already gotten an error from the
+      //     ListObjectsV2 call
+      // * If the key is empty, and the bucket exists, then there is
+      //     no way we can hit "not found"
+      // * If they key is not empty, then it's possible
+      //     that the file itself didn't exist and there
+      //     were not files under it.  In that case we will hit this if statement and
+      //     should treat this as a "not found" case.
+      if (empty && no_files_means_not_found) {
+        files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
+      }
+      if (close_sink) {
+        files_queue.Close();
+      }
+    }
 
-    Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
-                   std::vector<FileInfo>* out) {
-      // Walk "directories"
+    std::vector<std::string> GetNewDirectories(const std::string_view& path) {
+      std::string current(path);
+      std::string base = req.GetBucket();
+      if (!req.GetPrefix().empty()) {
+        base = base + kSep + std::string(internal::RemoveTrailingSlash(req.GetPrefix()));
+      }
+      std::vector<std::string> new_directories;
+      while (true) {
+        auto parent_base = internal::GetAbstractPathParent(current);
+        if (parent_base.first.empty()) {
+          break;
+        }
+        const std::string& parent_dir = parent_base.first;
+        current = parent_dir;
+        if (current == base) {
+          break;
+        }
+        if (directories.insert(parent_dir).second) {
+          new_directories.push_back(std::move(parent_base.first));
+        }
+      }
+      return new_directories;
+    }
+  };
+
+  struct FileListerTask : public util::AsyncTaskScheduler::Task {
+    std::shared_ptr<FileListerState> state;
+    util::AsyncTaskScheduler* scheduler;
+
+    FileListerTask(std::shared_ptr<FileListerState> state,
+                   util::AsyncTaskScheduler* scheduler)
+        : state(state), scheduler(scheduler) {}
+
+    std::vector<FileInfo> ToFileInfos(const std::string& bucket,
+                                      const std::string& prefix,
+                                      const S3Model::ListObjectsV2Result& result) {
+      std::vector<FileInfo> file_infos;
+      // If this is a non-recursive listing we may see "common prefixes" which represent
+      // directories we did not recurse into.  We will add those as directories.
       for (const auto& child_prefix : result.GetCommonPrefixes()) {
-        is_empty = false;
         const auto child_key =
             internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
-        std::stringstream child_path;
-        child_path << bucket << kSep << child_key;
+        std::stringstream child_path_ss;
+        child_path_ss << bucket << kSep << child_key;
         FileInfo info;
-        info.set_path(child_path.str());
+        info.set_path(child_path_ss.str());
         info.set_type(FileType::Directory);
-        out->push_back(std::move(info));
+        file_infos.push_back(std::move(info));
       }
-      // Walk "files"
+      // S3 doesn't have any concept of "max depth" and so we emulate it by counting the
+      // number of '/' characters.  E.g. if the user is searching bucket/subdirA/subdirB
+      // then the starting depth is 2.
+      // A file subdirA/subdirB/somefile will have a child depth of 2 and a "depth" of 0.
+      // A file subdirA/subdirB/subdirC/somefile will have a child depth of 3 and a
+      //   "depth" of 1
+      int base_depth =
+          (prefix.empty())
+              ? 0
+              : static_cast<int>(std::count(prefix.begin(), prefix.end(), kSep));
       for (const auto& obj : result.GetContents()) {
-        is_empty = false;
-        FileInfo info;
-        const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
-        if (child_key == std::string_view(prefix)) {
-          // Amazon can return the "directory" key itself as part of the results, skip
+        if (obj.GetKey() == prefix) {
+          // S3 will return the basedir itself (if it is a file / empty file).  We don't
+          // want that.  But this is still considered "finding the basedir" and so we mark
+          // it "not empty".
+          state->empty = false;
           continue;
         }
-        std::stringstream child_path;
-        child_path << bucket << kSep << child_key;
-        info.set_path(child_path.str());
-        FileObjectToInfo(obj, &info);
-        out->push_back(std::move(info));
-      }
-      return Status::OK();
-    }
+        std::string child_key =
+            std::string(internal::RemoveTrailingSlash(FromAwsString(obj.GetKey())));
+        bool had_trailing_slash = child_key.size() != obj.GetKey().size();
+        int child_depth =
+            static_cast<int>(std::count(child_key.begin(), child_key.end(), kSep));
+        int depth = child_depth - base_depth;
+
+        if (depth > state->max_recursion) {
+          // If we have A/B/C/D and max_recursion is 2 then we ignore this (don't add it
+          // to file_infos) but we still want to potentially add A and A/B as directories.
+          // So we "pretend" like we have a file A/B/C for the call to GetNewDirectories
+          // below
+          int to_trim = depth - state->max_recursion - 1;
+          if (to_trim > 0) {
+            child_key = bucket + kSep +
+                        internal::SliceAbstractPath(child_key, 0, child_depth - to_trim);
+          } else {
+            child_key = bucket + kSep + child_key;
+          }
+        } else {
+          // If the file isn't beyond our max recursion then count it as a file
+          // unless it's empty and then it depends on whether or not the file ends
+          // with a trailing slash
+          std::stringstream child_path_ss;
+          child_path_ss << bucket << kSep << child_key;
+          child_key = child_path_ss.str();
+          if (obj.GetSize() > 0 || !had_trailing_slash) {
+            // We found a real file
+            FileInfo info;
+            info.set_path(child_key);
+            FileObjectToInfo(obj, &info);
+            file_infos.push_back(std::move(info));
+          } else {
+            // We found an empty file and we want to treat it like a directory.  Only
+            // add it if we haven't seen this directory before.
+            if (state->directories.insert(child_key).second) {
+              file_infos.push_back(MakeDirectoryInfo(child_key));
+            }
+          }
+        }
 
-    Status Finish(Impl* impl) {
-      // If no contents were found, perhaps it's an empty "directory",
-      // or perhaps it's a nonexistent entry.  Check.
-      if (is_empty && !allow_not_found) {
-        ARROW_ASSIGN_OR_RAISE(bool is_actually_empty,
-                              impl->IsEmptyDirectory(bucket, key));
-        if (!is_actually_empty) {
-          return PathNotFound(bucket, key);
+        if (state->include_virtual) {
+          // Now that we've dealt with the file itself we need to look at each of the
+          // parent paths and potentially add them as directories.  For example, after
+          // finding a file A/B/C/D we want to consider adding directories A, A/B, and
+          // A/B/C.
+          for (const auto& newdir : state->GetNewDirectories(child_key)) {
+            file_infos.push_back(MakeDirectoryInfo(newdir));
+          }
         }
       }
-      return Status::OK();
+      if (file_infos.size() > 0) {
+        state->empty = false;
+      }
+      return file_infos;
     }
 
-    std::string bucket;
-    std::string key;
-    bool allow_not_found;
-    bool is_empty = true;
-  };
-
-  // Workhorse for GetFileInfo(FileSelector...)
-  Status Walk(const FileSelector& select, const std::string& bucket,
-              const std::string& key, std::vector<FileInfo>* out) {
-    FileInfoCollector collector(bucket, key, select);
-
-    auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
-      if (select.allow_not_found && IsNotFound(error)) {
-        return Status::OK();
+    void Run() {
+      // We are on an I/O thread now so just synchronously make the call and interpret the
+      // results.
+      S3Model::ListObjectsV2Outcome outcome = state->client->ListObjectsV2(state->req);
+      if (!outcome.IsSuccess()) {
+        const auto& err = outcome.GetError();
+        if (state->allow_not_found && IsNotFound(err)) {
+          return;
+        }
+        state->files_queue.Push(
+            ErrorToStatus(std::forward_as_tuple("When listing objects under key '",
+                                                state->req.GetPrefix(), "' in bucket '",
+                                                state->req.GetBucket(), "': "),
+                          "ListObjectsV2", err));
+        return;
+      }
+      const S3Model::ListObjectsV2Result& result = outcome.GetResult();
+      // We could immediately schedule the continuation (if there are enough results to
+      // trigger paging) but that would introduce race condition complexity for arguably
+      // little benefit.
+      std::vector<FileInfo> file_infos =
+          ToFileInfos(state->req.GetBucket(), state->req.GetPrefix(), result);
+      if (file_infos.size() > 0) {
+        state->files_queue.Push(std::move(file_infos));
       }
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
-                                                 "' in bucket '", bucket, "': "),
-                           "ListObjectsV2", error);
-    };
 
-    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
-      RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
-      return select.recursive && nesting_depth <= select.max_recursion;
-    };
+      // If there are enough files to warrant a continuation then go ahead and schedule
+      // that now.
+      if (result.GetIsTruncated()) {
+        DCHECK(!result.GetNextContinuationToken().empty());
+        state->req.SetContinuationToken(result.GetNextContinuationToken());
+        scheduler->AddTask(std::make_unique<FileListerTask>(state, scheduler));
+      }
+    }
 
-    auto handle_results = [&](const std::string& prefix,
-                              const S3Model::ListObjectsV2Result& result) -> Status {
-      return collector.Collect(prefix, result, out);
-    };
+    Result<Future<>> operator()() override {
+      return state->io_context.executor()->Submit([this] {
+        Run();
+        return Status::OK();
+      });
+    }
+    std::string_view name() const override { return "S3ListFiles"; }
+  };
 
-    RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
-                                   handle_results, handle_error, handle_recursion));
+  void ListAsync(const FileSelector& select, const std::string& bucket,
+                 const std::string& key, bool include_virtual,

Review Comment:
   What is "include_virtual"?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;
+    int max_recursion;
+    bool include_virtual;
+    S3Model::ListObjectsV2Request req;
+    io::IOContext io_context;
+    std::shared_ptr<Aws::S3::S3Client> client;
+    bool close_sink;
+    bool no_files_means_not_found;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_virtual,
+                    io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client,
+                    bool close_sink)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_virtual(include_virtual),
+          io_context(io_context),
+          client(std::move(client)),
+          close_sink(close_sink),
+          no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
+      }
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
+      }
+    }
+
+    // The FileListerState is kept alive by the various FileListerTasks.  Once all the
+    // tasks are finished this will be destroyed and we can run some cleanup
+    ~FileListerState() {
+      // * If the bucket doesn't exist we will have already gotten an error from the
+      //     ListObjectsV2 call
+      // * If the key is empty, and the bucket exists, then there is
+      //     no way we can hit "not found"
+      // * If they key is not empty, then it's possible

Review Comment:
   ```suggestion
         // * If the key is not empty, then it's possible
   ```



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;

Review Comment:
   Can we make most of these members `const` to distinguish between settings and mutable state?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;
+    int max_recursion;
+    bool include_virtual;
+    S3Model::ListObjectsV2Request req;
+    io::IOContext io_context;
+    std::shared_ptr<Aws::S3::S3Client> client;
+    bool close_sink;
+    bool no_files_means_not_found;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_virtual,
+                    io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client,
+                    bool close_sink)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_virtual(include_virtual),
+          io_context(io_context),
+          client(std::move(client)),
+          close_sink(close_sink),
+          no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
+      }
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
+      }
+    }
+
+    // The FileListerState is kept alive by the various FileListerTasks.  Once all the
+    // tasks are finished this will be destroyed and we can run some cleanup
+    ~FileListerState() {
+      // * If the bucket doesn't exist we will have already gotten an error from the
+      //     ListObjectsV2 call
+      // * If the key is empty, and the bucket exists, then there is
+      //     no way we can hit "not found"
+      // * If they key is not empty, then it's possible
+      //     that the file itself didn't exist and there
+      //     were not files under it.  In that case we will hit this if statement and
+      //     should treat this as a "not found" case.
+      if (empty && no_files_means_not_found) {
+        files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
+      }
+      if (close_sink) {
+        files_queue.Close();

Review Comment:
   Does this mean we're in the top-level task? Why not do it from the task continuation?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;
+    int max_recursion;
+    bool include_virtual;
+    S3Model::ListObjectsV2Request req;
+    io::IOContext io_context;
+    std::shared_ptr<Aws::S3::S3Client> client;
+    bool close_sink;
+    bool no_files_means_not_found;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_virtual,
+                    io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client,
+                    bool close_sink)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_virtual(include_virtual),
+          io_context(io_context),
+          client(std::move(client)),
+          close_sink(close_sink),
+          no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
+      }
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
+      }
+    }
+
+    // The FileListerState is kept alive by the various FileListerTasks.  Once all the
+    // tasks are finished this will be destroyed and we can run some cleanup
+    ~FileListerState() {
+      // * If the bucket doesn't exist we will have already gotten an error from the
+      //     ListObjectsV2 call
+      // * If the key is empty, and the bucket exists, then there is
+      //     no way we can hit "not found"
+      // * If they key is not empty, then it's possible
+      //     that the file itself didn't exist and there
+      //     were not files under it.  In that case we will hit this if statement and

Review Comment:
   What does "the file itself didn't exist and there were not files under it" mean?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;
+    int max_recursion;
+    bool include_virtual;
+    S3Model::ListObjectsV2Request req;
+    io::IOContext io_context;
+    std::shared_ptr<Aws::S3::S3Client> client;
+    bool close_sink;
+    bool no_files_means_not_found;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_virtual,
+                    io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client,
+                    bool close_sink)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_virtual(include_virtual),
+          io_context(io_context),
+          client(std::move(client)),
+          close_sink(close_sink),
+          no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
+      }
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
+      }
+    }
+
+    // The FileListerState is kept alive by the various FileListerTasks.  Once all the
+    // tasks are finished this will be destroyed and we can run some cleanup
+    ~FileListerState() {
+      // * If the bucket doesn't exist we will have already gotten an error from the
+      //     ListObjectsV2 call
+      // * If the key is empty, and the bucket exists, then there is
+      //     no way we can hit "not found"
+      // * If they key is not empty, then it's possible
+      //     that the file itself didn't exist and there
+      //     were not files under it.  In that case we will hit this if statement and
+      //     should treat this as a "not found" case.
+      if (empty && no_files_means_not_found) {
+        files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
+      }
+      if (close_sink) {
+        files_queue.Close();
+      }
+    }
 
-    Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
-                   std::vector<FileInfo>* out) {
-      // Walk "directories"
+    std::vector<std::string> GetNewDirectories(const std::string_view& path) {
+      std::string current(path);
+      std::string base = req.GetBucket();
+      if (!req.GetPrefix().empty()) {
+        base = base + kSep + std::string(internal::RemoveTrailingSlash(req.GetPrefix()));
+      }
+      std::vector<std::string> new_directories;
+      while (true) {
+        auto parent_base = internal::GetAbstractPathParent(current);
+        if (parent_base.first.empty()) {
+          break;
+        }
+        const std::string& parent_dir = parent_base.first;

Review Comment:
   Why not do this above?
   ```c++
   const auto parent_dir = internal::GetAbstractPathParent(current).first;
   ```



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;
+    int max_recursion;
+    bool include_virtual;
+    S3Model::ListObjectsV2Request req;
+    io::IOContext io_context;
+    std::shared_ptr<Aws::S3::S3Client> client;
+    bool close_sink;
+    bool no_files_means_not_found;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_virtual,
+                    io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client,
+                    bool close_sink)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_virtual(include_virtual),
+          io_context(io_context),
+          client(std::move(client)),
+          close_sink(close_sink),
+          no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
+      }
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
+      }
+    }
+
+    // The FileListerState is kept alive by the various FileListerTasks.  Once all the
+    // tasks are finished this will be destroyed and we can run some cleanup
+    ~FileListerState() {
+      // * If the bucket doesn't exist we will have already gotten an error from the
+      //     ListObjectsV2 call
+      // * If the key is empty, and the bucket exists, then there is
+      //     no way we can hit "not found"
+      // * If they key is not empty, then it's possible
+      //     that the file itself didn't exist and there
+      //     were not files under it.  In that case we will hit this if statement and
+      //     should treat this as a "not found" case.
+      if (empty && no_files_means_not_found) {
+        files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
+      }
+      if (close_sink) {
+        files_queue.Close();
+      }
+    }
 
-    Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
-                   std::vector<FileInfo>* out) {
-      // Walk "directories"
+    std::vector<std::string> GetNewDirectories(const std::string_view& path) {
+      std::string current(path);
+      std::string base = req.GetBucket();
+      if (!req.GetPrefix().empty()) {
+        base = base + kSep + std::string(internal::RemoveTrailingSlash(req.GetPrefix()));
+      }
+      std::vector<std::string> new_directories;
+      while (true) {
+        auto parent_base = internal::GetAbstractPathParent(current);
+        if (parent_base.first.empty()) {
+          break;
+        }
+        const std::string& parent_dir = parent_base.first;
+        current = parent_dir;
+        if (current == base) {
+          break;
+        }
+        if (directories.insert(parent_dir).second) {
+          new_directories.push_back(std::move(parent_base.first));
+        }
+      }
+      return new_directories;
+    }
+  };
+
+  struct FileListerTask : public util::AsyncTaskScheduler::Task {
+    std::shared_ptr<FileListerState> state;
+    util::AsyncTaskScheduler* scheduler;
+
+    FileListerTask(std::shared_ptr<FileListerState> state,
+                   util::AsyncTaskScheduler* scheduler)
+        : state(state), scheduler(scheduler) {}
+
+    std::vector<FileInfo> ToFileInfos(const std::string& bucket,
+                                      const std::string& prefix,
+                                      const S3Model::ListObjectsV2Result& result) {
+      std::vector<FileInfo> file_infos;
+      // If this is a non-recursive listing we may see "common prefixes" which represent
+      // directories we did not recurse into.  We will add those as directories.
       for (const auto& child_prefix : result.GetCommonPrefixes()) {
-        is_empty = false;
         const auto child_key =
             internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
-        std::stringstream child_path;
-        child_path << bucket << kSep << child_key;
+        std::stringstream child_path_ss;
+        child_path_ss << bucket << kSep << child_key;
         FileInfo info;
-        info.set_path(child_path.str());
+        info.set_path(child_path_ss.str());
         info.set_type(FileType::Directory);
-        out->push_back(std::move(info));
+        file_infos.push_back(std::move(info));
       }
-      // Walk "files"
+      // S3 doesn't have any concept of "max depth" and so we emulate it by counting the
+      // number of '/' characters.  E.g. if the user is searching bucket/subdirA/subdirB
+      // then the starting depth is 2.
+      // A file subdirA/subdirB/somefile will have a child depth of 2 and a "depth" of 0.
+      // A file subdirA/subdirB/subdirC/somefile will have a child depth of 3 and a
+      //   "depth" of 1
+      int base_depth =
+          (prefix.empty())
+              ? 0
+              : static_cast<int>(std::count(prefix.begin(), prefix.end(), kSep));
       for (const auto& obj : result.GetContents()) {
-        is_empty = false;
-        FileInfo info;
-        const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
-        if (child_key == std::string_view(prefix)) {
-          // Amazon can return the "directory" key itself as part of the results, skip
+        if (obj.GetKey() == prefix) {
+          // S3 will return the basedir itself (if it is a file / empty file).  We don't
+          // want that.  But this is still considered "finding the basedir" and so we mark
+          // it "not empty".
+          state->empty = false;
           continue;
         }
-        std::stringstream child_path;
-        child_path << bucket << kSep << child_key;
-        info.set_path(child_path.str());
-        FileObjectToInfo(obj, &info);
-        out->push_back(std::move(info));
-      }
-      return Status::OK();
-    }
+        std::string child_key =
+            std::string(internal::RemoveTrailingSlash(FromAwsString(obj.GetKey())));
+        bool had_trailing_slash = child_key.size() != obj.GetKey().size();
+        int child_depth =
+            static_cast<int>(std::count(child_key.begin(), child_key.end(), kSep));
+        int depth = child_depth - base_depth;
+
+        if (depth > state->max_recursion) {
+          // If we have A/B/C/D and max_recursion is 2 then we ignore this (don't add it
+          // to file_infos) but we still want to potentially add A and A/B as directories.
+          // So we "pretend" like we have a file A/B/C for the call to GetNewDirectories
+          // below
+          int to_trim = depth - state->max_recursion - 1;
+          if (to_trim > 0) {
+            child_key = bucket + kSep +
+                        internal::SliceAbstractPath(child_key, 0, child_depth - to_trim);
+          } else {
+            child_key = bucket + kSep + child_key;
+          }
+        } else {
+          // If the file isn't beyond our max recursion then count it as a file
+          // unless it's empty and then it depends on whether or not the file ends
+          // with a trailing slash
+          std::stringstream child_path_ss;
+          child_path_ss << bucket << kSep << child_key;
+          child_key = child_path_ss.str();
+          if (obj.GetSize() > 0 || !had_trailing_slash) {
+            // We found a real file
+            FileInfo info;
+            info.set_path(child_key);
+            FileObjectToInfo(obj, &info);
+            file_infos.push_back(std::move(info));
+          } else {
+            // We found an empty file and we want to treat it like a directory.  Only
+            // add it if we haven't seen this directory before.
+            if (state->directories.insert(child_key).second) {
+              file_infos.push_back(MakeDirectoryInfo(child_key));
+            }
+          }
+        }
 
-    Status Finish(Impl* impl) {
-      // If no contents were found, perhaps it's an empty "directory",
-      // or perhaps it's a nonexistent entry.  Check.
-      if (is_empty && !allow_not_found) {
-        ARROW_ASSIGN_OR_RAISE(bool is_actually_empty,
-                              impl->IsEmptyDirectory(bucket, key));
-        if (!is_actually_empty) {
-          return PathNotFound(bucket, key);
+        if (state->include_virtual) {
+          // Now that we've dealt with the file itself we need to look at each of the
+          // parent paths and potentially add them as directories.  For example, after
+          // finding a file A/B/C/D we want to consider adding directories A, A/B, and
+          // A/B/C.
+          for (const auto& newdir : state->GetNewDirectories(child_key)) {
+            file_infos.push_back(MakeDirectoryInfo(newdir));
+          }
         }
       }
-      return Status::OK();
+      if (file_infos.size() > 0) {
+        state->empty = false;
+      }
+      return file_infos;
     }
 
-    std::string bucket;
-    std::string key;
-    bool allow_not_found;
-    bool is_empty = true;
-  };
-
-  // Workhorse for GetFileInfo(FileSelector...)
-  Status Walk(const FileSelector& select, const std::string& bucket,
-              const std::string& key, std::vector<FileInfo>* out) {
-    FileInfoCollector collector(bucket, key, select);
-
-    auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
-      if (select.allow_not_found && IsNotFound(error)) {
-        return Status::OK();
+    void Run() {
+      // We are on an I/O thread now so just synchronously make the call and interpret the
+      // results.
+      S3Model::ListObjectsV2Outcome outcome = state->client->ListObjectsV2(state->req);
+      if (!outcome.IsSuccess()) {
+        const auto& err = outcome.GetError();
+        if (state->allow_not_found && IsNotFound(err)) {
+          return;
+        }
+        state->files_queue.Push(
+            ErrorToStatus(std::forward_as_tuple("When listing objects under key '",
+                                                state->req.GetPrefix(), "' in bucket '",
+                                                state->req.GetBucket(), "': "),
+                          "ListObjectsV2", err));
+        return;
+      }
+      const S3Model::ListObjectsV2Result& result = outcome.GetResult();
+      // We could immediately schedule the continuation (if there are enough results to
+      // trigger paging) but that would introduce race condition complexity for arguably
+      // little benefit.
+      std::vector<FileInfo> file_infos =
+          ToFileInfos(state->req.GetBucket(), state->req.GetPrefix(), result);
+      if (file_infos.size() > 0) {
+        state->files_queue.Push(std::move(file_infos));
       }
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
-                                                 "' in bucket '", bucket, "': "),
-                           "ListObjectsV2", error);
-    };
 
-    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
-      RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
-      return select.recursive && nesting_depth <= select.max_recursion;
-    };
+      // If there are enough files to warrant a continuation then go ahead and schedule
+      // that now.
+      if (result.GetIsTruncated()) {
+        DCHECK(!result.GetNextContinuationToken().empty());
+        state->req.SetContinuationToken(result.GetNextContinuationToken());
+        scheduler->AddTask(std::make_unique<FileListerTask>(state, scheduler));
+      }
+    }
 
-    auto handle_results = [&](const std::string& prefix,
-                              const S3Model::ListObjectsV2Result& result) -> Status {
-      return collector.Collect(prefix, result, out);
-    };
+    Result<Future<>> operator()() override {
+      return state->io_context.executor()->Submit([this] {
+        Run();
+        return Status::OK();
+      });
+    }
+    std::string_view name() const override { return "S3ListFiles"; }
+  };
 
-    RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
-                                   handle_results, handle_error, handle_recursion));
+  void ListAsync(const FileSelector& select, const std::string& bucket,
+                 const std::string& key, bool include_virtual,
+                 util::AsyncTaskScheduler* scheduler, FileInfoSink sink,
+                 bool close_sink) {
+    // We can only fetch kListObjectsMaxKeys files at a time and so we create a
+    // scheduler and schedule a task to grab the first batch.  Once that's done we
+    // schedule a new task for the next batch.  All of these tasks share the same
+    // FileListerState object but none of these tasks run in parallel so there is
+    // no need to worry about mutexes
+    auto state = std::make_shared<FileListerState>(
+        sink, select, bucket, key, include_virtual, io_context_, client_, close_sink);
 
-    // If no contents were found, perhaps it's an empty "directory",
-    // or perhaps it's a nonexistent entry.  Check.
-    RETURN_NOT_OK(collector.Finish(this));
-    // Sort results for convenience, since they can come massively out of order
-    std::sort(out->begin(), out->end(), FileInfo::ByPath{});
-    return Status::OK();
+    // Create the first file lister task (it may spawn more)
+    auto file_lister_task = std::make_unique<FileListerTask>(state, scheduler);
+    scheduler->AddTask(std::move(file_lister_task));
   }
 
-  // Workhorse for GetFileInfoGenerator(FileSelector...)
-  FileInfoGenerator WalkAsync(const FileSelector& select, const std::string& bucket,
-                              const std::string& key) {
-    PushGenerator<std::vector<FileInfo>> gen;
-    auto producer = gen.producer();
-    auto collector = std::make_shared<FileInfoCollector>(bucket, key, select);
+  Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
     auto self = shared_from_this();
-
-    auto handle_error = [select, bucket, key](const AWSError<S3Errors>& error) -> Status {
-      if (select.allow_not_found && IsNotFound(error)) {
-        return Status::OK();
-      }
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
-                                                 "' in bucket '", bucket, "': "),
-                           "ListObjectsV2", error);
-    };
-
-    auto handle_recursion = [producer, select,
-                             self](int32_t nesting_depth) -> Result<bool> {
-      if (producer.is_closed()) {
-        return false;
-      }
-      RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
-      return select.recursive && nesting_depth <= select.max_recursion;
-    };
-
-    auto handle_results =
-        [collector, producer](
-            const std::string& prefix,
-            const S3Model::ListObjectsV2Result& result) mutable -> Status {
-      std::vector<FileInfo> out;
-      RETURN_NOT_OK(collector->Collect(prefix, result, &out));
-      if (!out.empty()) {
-        producer.Push(std::move(out));
-      }
-      return Status::OK();
-    };
-
-    TreeWalker::WalkAsync(client_, io_context_, bucket, key, kListObjectsMaxKeys,
-                          handle_results, handle_error, handle_recursion)
-        .AddCallback([collector, producer, self](const Status& status) mutable {
-          auto st = collector->Finish(self.get());
-          if (!st.ok()) {
-            producer.Push(st);
-          }
-          producer.Close();
+    return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); }))
+        // TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
+        .Then([](const Aws::S3::Model::ListBucketsOutcome& outcome) {
+          return Impl::ProcessListBuckets(outcome);
         });
-    return gen;
   }
 
-  struct WalkResult {
-    std::vector<std::string> file_keys;
-    std::vector<std::string> dir_keys;
-  };
-  Future<std::shared_ptr<WalkResult>> WalkForDeleteDirAsync(const std::string& bucket,
-                                                            const std::string& key) {
-    auto state = std::make_shared<WalkResult>();
-
-    auto handle_results = [state](const std::string& prefix,
-                                  const S3Model::ListObjectsV2Result& result) -> Status {
-      // Walk "files"
-      state->file_keys.reserve(state->file_keys.size() + result.GetContents().size());
-      for (const auto& obj : result.GetContents()) {
-        state->file_keys.emplace_back(FromAwsString(obj.GetKey()));
-      }
-      // Walk "directories"
-      state->dir_keys.reserve(state->dir_keys.size() + result.GetCommonPrefixes().size());
-      for (const auto& prefix : result.GetCommonPrefixes()) {
-        state->dir_keys.emplace_back(FromAwsString(prefix.GetPrefix()));
-      }
-      return Status::OK();
-    };
-
-    auto handle_error = [=](const AWSError<S3Errors>& error) -> Status {
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
-                                                 "' in bucket '", bucket, "': "),
-                           "ListObjectsV2", error);
-    };
-
+  // Fully list all files from all buckets
+  void FullListAsync(bool include_virtual, util::AsyncTaskScheduler* scheduler,
+                     FileInfoSink sink, io::IOContext io_context, bool recursive) {
     auto self = shared_from_this();
-    auto handle_recursion = [self](int32_t nesting_depth) -> Result<bool> {
-      RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
-      return true;  // Recurse
-    };
-
-    return TreeWalker::WalkAsync(client_, io_context_, bucket, key, kListObjectsMaxKeys,
-                                 handle_results, handle_error, handle_recursion)
-        .Then([state]() { return state; });
+    scheduler->AddSimpleTask(
+        [self, scheduler, sink, io_context, include_virtual, recursive]() mutable {
+          return self->ListBucketsAsync(io_context)
+              .Then([self, scheduler, sink, include_virtual,
+                     recursive](const std::vector<std::string>& buckets) mutable {
+                // Return the buckets themselves as directories
+                std::vector<FileInfo> buckets_as_directories =
+                    MakeDirectoryInfos(buckets);
+                sink.Push(std::move(buckets_as_directories));
+
+                if (recursive) {
+                  // Recursively list each bucket (these will run in parallel but out_gen

Review Comment:
   ```suggestion
                     // Recursively list each bucket (these will run in parallel but sink
   ```



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;
+    int max_recursion;
+    bool include_virtual;

Review Comment:
   What does "virtual" mean here?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;
+    int max_recursion;
+    bool include_virtual;
+    S3Model::ListObjectsV2Request req;
+    io::IOContext io_context;
+    std::shared_ptr<Aws::S3::S3Client> client;
+    bool close_sink;
+    bool no_files_means_not_found;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_virtual,
+                    io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client,
+                    bool close_sink)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_virtual(include_virtual),
+          io_context(io_context),
+          client(std::move(client)),
+          close_sink(close_sink),
+          no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
+      }
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
+      }
+    }
+
+    // The FileListerState is kept alive by the various FileListerTasks.  Once all the
+    // tasks are finished this will be destroyed and we can run some cleanup
+    ~FileListerState() {
+      // * If the bucket doesn't exist we will have already gotten an error from the
+      //     ListObjectsV2 call
+      // * If the key is empty, and the bucket exists, then there is
+      //     no way we can hit "not found"
+      // * If they key is not empty, then it's possible
+      //     that the file itself didn't exist and there
+      //     were not files under it.  In that case we will hit this if statement and
+      //     should treat this as a "not found" case.
+      if (empty && no_files_means_not_found) {
+        files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
+      }
+      if (close_sink) {
+        files_queue.Close();
+      }
+    }
 
-    Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
-                   std::vector<FileInfo>* out) {
-      // Walk "directories"
+    std::vector<std::string> GetNewDirectories(const std::string_view& path) {
+      std::string current(path);
+      std::string base = req.GetBucket();
+      if (!req.GetPrefix().empty()) {
+        base = base + kSep + std::string(internal::RemoveTrailingSlash(req.GetPrefix()));
+      }
+      std::vector<std::string> new_directories;
+      while (true) {
+        auto parent_base = internal::GetAbstractPathParent(current);
+        if (parent_base.first.empty()) {
+          break;
+        }
+        const std::string& parent_dir = parent_base.first;
+        current = parent_dir;
+        if (current == base) {
+          break;
+        }
+        if (directories.insert(parent_dir).second) {
+          new_directories.push_back(std::move(parent_base.first));
+        }
+      }
+      return new_directories;
+    }
+  };
+
+  struct FileListerTask : public util::AsyncTaskScheduler::Task {
+    std::shared_ptr<FileListerState> state;
+    util::AsyncTaskScheduler* scheduler;
+
+    FileListerTask(std::shared_ptr<FileListerState> state,
+                   util::AsyncTaskScheduler* scheduler)
+        : state(state), scheduler(scheduler) {}
+
+    std::vector<FileInfo> ToFileInfos(const std::string& bucket,
+                                      const std::string& prefix,
+                                      const S3Model::ListObjectsV2Result& result) {
+      std::vector<FileInfo> file_infos;
+      // If this is a non-recursive listing we may see "common prefixes" which represent
+      // directories we did not recurse into.  We will add those as directories.
       for (const auto& child_prefix : result.GetCommonPrefixes()) {
-        is_empty = false;
         const auto child_key =
             internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
-        std::stringstream child_path;
-        child_path << bucket << kSep << child_key;
+        std::stringstream child_path_ss;
+        child_path_ss << bucket << kSep << child_key;
         FileInfo info;
-        info.set_path(child_path.str());
+        info.set_path(child_path_ss.str());
         info.set_type(FileType::Directory);
-        out->push_back(std::move(info));
+        file_infos.push_back(std::move(info));
       }
-      // Walk "files"
+      // S3 doesn't have any concept of "max depth" and so we emulate it by counting the
+      // number of '/' characters.  E.g. if the user is searching bucket/subdirA/subdirB
+      // then the starting depth is 2.
+      // A file subdirA/subdirB/somefile will have a child depth of 2 and a "depth" of 0.
+      // A file subdirA/subdirB/subdirC/somefile will have a child depth of 3 and a
+      //   "depth" of 1
+      int base_depth =
+          (prefix.empty())
+              ? 0
+              : static_cast<int>(std::count(prefix.begin(), prefix.end(), kSep));

Review Comment:
   Write a small helper for this?
   ```c++
   int GetAbstractPathDepth(util::string_view path) {
     if (path.empty()) { return 0; };
     return static_cast<int>(std::count(path.begin(), path.end(), kSep));
   }
   ```
   



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(dirname);
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+  struct FileListerState {
+    FileInfoSink files_queue;
+    bool allow_not_found;
+    int max_recursion;
+    bool include_virtual;
+    S3Model::ListObjectsV2Request req;
+    io::IOContext io_context;
+    std::shared_ptr<Aws::S3::S3Client> client;
+    bool close_sink;
+    bool no_files_means_not_found;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_virtual,
+                    io::IOContext io_context, std::shared_ptr<Aws::S3::S3Client> client,
+                    bool close_sink)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_virtual(include_virtual),
+          io_context(io_context),
+          client(std::move(client)),
+          close_sink(close_sink),
+          no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
+      }
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
+      }
+    }
+
+    // The FileListerState is kept alive by the various FileListerTasks.  Once all the
+    // tasks are finished this will be destroyed and we can run some cleanup
+    ~FileListerState() {
+      // * If the bucket doesn't exist we will have already gotten an error from the
+      //     ListObjectsV2 call
+      // * If the key is empty, and the bucket exists, then there is
+      //     no way we can hit "not found"
+      // * If they key is not empty, then it's possible
+      //     that the file itself didn't exist and there
+      //     were not files under it.  In that case we will hit this if statement and
+      //     should treat this as a "not found" case.
+      if (empty && no_files_means_not_found) {
+        files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
+      }
+      if (close_sink) {
+        files_queue.Close();
+      }
+    }
 
-    Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
-                   std::vector<FileInfo>* out) {
-      // Walk "directories"
+    std::vector<std::string> GetNewDirectories(const std::string_view& path) {

Review Comment:
   Please add a comment explaining what this does.



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -2116,28 +2083,86 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
     return DeleteObjectsAsync(bucket, keys).status();
   }
 
+  Future<> EnsureNotFileAsync(const std::string& bucket, const std::string& key) {
+    if (key.empty()) {
+      // There is no way for a bucket to be a file
+      return Future<>::MakeFinished();

Review Comment:
   I assume this is not a problem because our task functions will schedule the actual IO using `SubmitIO`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org