You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/01 13:21:34 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #11812: ARROW-14347: [C++] random access files for GcsFileSystem

pitrou commented on a change in pull request #11812:
URL: https://github.com/apache/arrow/pull/11812#discussion_r760170770



##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -172,6 +174,79 @@ class GcsOutputStream : public arrow::io::OutputStream {
   int64_t tell_ = 0;
 };
 
+using InputStreamFactory = std::function<Result<std::shared_ptr<io::InputStream>>(
+    const std::string&, const std::string&, gcs::Generation, gcs::ReadFromOffset)>;
+
+class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
+ public:
+  GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata,
+                      std::shared_ptr<io::InputStream> stream)
+      : factory_(std::move(factory)),
+        metadata_(std::move(metadata)),
+        stream_(std::move(stream)) {}
+  ~GcsRandomAccessFile() override = default;
+
+  //@{
+  // @name FileInterface
+  Status Close() override { return stream_->Close(); }
+  Status Abort() override { return stream_->Abort(); }
+  Result<int64_t> Tell() const override { return stream_->Tell(); }
+  bool closed() const override { return stream_->closed(); }
+  //@}
+
+  //@{
+  // @name Readable
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    return stream_->Read(nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    return stream_->Read(nbytes);
+  }
+  const arrow::io::IOContext& io_context() const override {
+    return stream_->io_context();
+  }
+  //@}
+
+  //@{
+  // @name InputStream
+  Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
+    return internal::FromObjectMetadata(metadata_);
+  }
+  //@}
+
+  //@{
+  // @name RandomAccessFile
+  Result<int64_t> GetSize() override { return metadata_.size(); }
+  Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
+    std::shared_ptr<io::InputStream> stream;
+    ARROW_ASSIGN_OR_RAISE(stream, factory_(metadata_.bucket(), metadata_.name(),
+                                           gcs::Generation(metadata_.generation()),
+                                           gcs::ReadFromOffset(position)));

Review comment:
       Interesting. It seems a bit convoluted, but I guess the GCS C++ API is forcing us to do this.

##########
File path: cpp/src/arrow/filesystem/gcsfs_test.cc
##########
@@ -521,6 +540,130 @@ TEST_F(GcsIntegrationTest, WriteObjectLarge) {
   EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]);
 }
 
+TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  // Create a file large enough to make the random access tests non-trivial.
+  auto constexpr kLineWidth = 100;
+  auto constexpr kLineCount = 4096;
+  std::vector<std::string> lines(kLineCount);
+  int lineno = 0;
+  std::generate_n(lines.begin(), lines.size(),
+                  [&] { return RandomLine(++lineno, kLineWidth); });
+
+  const auto path =
+      kPreexistingBucket + std::string("/OpenInputFileMixedReadVsReadAt/object-name");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  for (auto const& line : lines) {
+    ASSERT_OK(output->Write(line.data(), line.size()));
+  }
+  ASSERT_OK(output->Close());
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
+  for (int i = 0; i != 32; ++i) {
+    SCOPED_TRACE("Iteration " + std::to_string(i));
+    // Verify sequential reads work as expected.
+    std::array<char, kLineWidth> buffer{};
+    std::int64_t size;
+    {
+      ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+      EXPECT_EQ(lines[2 * i], actual->ToString());
+    }
+    {
+      ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data()));
+      EXPECT_EQ(size, kLineWidth);
+      auto actual = std::string{buffer.begin(), buffer.end()};
+      EXPECT_EQ(lines[2 * i + 1], actual);
+    }
+
+    // Verify random reads interleave too.
+    auto const index = RandomIndex(kLineCount);
+    auto const position = index * kLineWidth;
+    ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data()));
+    EXPECT_EQ(size, kLineWidth);
+    auto actual = std::string{buffer.begin(), buffer.end()};
+    EXPECT_EQ(lines[index], actual);
+
+    // Verify random reads using buffers work.
+    ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth));
+    EXPECT_EQ(lines[index], b->ToString());
+  }
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  // Create a file large enough to make the random access tests non-trivial.
+  auto constexpr kLineWidth = 100;
+  auto constexpr kLineCount = 4096;
+  std::vector<std::string> lines(kLineCount);
+  int lineno = 0;
+  std::generate_n(lines.begin(), lines.size(),
+                  [&] { return RandomLine(++lineno, kLineWidth); });
+
+  const auto path =
+      kPreexistingBucket + std::string("/OpenInputFileRandomSeek/object-name");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  for (auto const& line : lines) {
+    ASSERT_OK(output->Write(line.data(), line.size()));
+  }
+  ASSERT_OK(output->Close());
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
+  for (int i = 0; i != 32; ++i) {
+    SCOPED_TRACE("Iteration " + std::to_string(i));
+    // Verify sequential reads work as expected.
+    auto const index = RandomIndex(kLineCount);
+    auto const position = index * kLineWidth;
+    ASSERT_OK(file->Seek(position));
+    ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+    EXPECT_EQ(lines[index], actual->ToString());
+  }
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileInfo) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  arrow::fs::FileInfo info;
+  ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath()));
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(info));
+
+  std::array<char, 1024> buffer{};
+  std::int64_t size;
+  auto constexpr kStart = 16;
+  ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(), buffer.data()));
+
+  auto const expected = std::string(kLoremIpsum).substr(kStart);
+  EXPECT_EQ(std::string(buffer.data(), size), expected);
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileNotFound) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  auto result = fs->OpenInputFile(NotFoundObjectPath());
+  EXPECT_EQ(result.status().code(), StatusCode::IOError);
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileInfoInvalid) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  arrow::fs::FileInfo info;
+  ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket));
+
+  auto result = fs->OpenInputFile(NotFoundObjectPath());
+  EXPECT_EQ(result.status().code(), StatusCode::IOError);
+
+  ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));
+  result = fs->OpenInputFile(NotFoundObjectPath());

Review comment:
       Did you mean to call `fs->OpenInputFile(info)` instead here as well?

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -324,17 +407,46 @@ Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
     return Status::IOError("Only files can be opened as input streams");
   }
   ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
-  return impl_->OpenInputStream(p);
+  return impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(),
+                                gcs::ReadFromOffset());
 }
 
 Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
     const std::string& path) {
-  return Status::NotImplemented("The GCS FileSystem is not fully implemented");
+  ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
+  auto metadata = impl_->GetObjectMetadata(p);

Review comment:
       I'm curious, does this add a roundtrip to the server?

##########
File path: cpp/src/arrow/filesystem/gcsfs_test.cc
##########
@@ -521,6 +540,130 @@ TEST_F(GcsIntegrationTest, WriteObjectLarge) {
   EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]);
 }
 
+TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  // Create a file large enough to make the random access tests non-trivial.
+  auto constexpr kLineWidth = 100;
+  auto constexpr kLineCount = 4096;
+  std::vector<std::string> lines(kLineCount);
+  int lineno = 0;
+  std::generate_n(lines.begin(), lines.size(),
+                  [&] { return RandomLine(++lineno, kLineWidth); });
+
+  const auto path =
+      kPreexistingBucket + std::string("/OpenInputFileMixedReadVsReadAt/object-name");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  for (auto const& line : lines) {
+    ASSERT_OK(output->Write(line.data(), line.size()));
+  }
+  ASSERT_OK(output->Close());
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
+  for (int i = 0; i != 32; ++i) {
+    SCOPED_TRACE("Iteration " + std::to_string(i));
+    // Verify sequential reads work as expected.
+    std::array<char, kLineWidth> buffer{};
+    std::int64_t size;
+    {
+      ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+      EXPECT_EQ(lines[2 * i], actual->ToString());
+    }
+    {
+      ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data()));
+      EXPECT_EQ(size, kLineWidth);
+      auto actual = std::string{buffer.begin(), buffer.end()};
+      EXPECT_EQ(lines[2 * i + 1], actual);
+    }
+
+    // Verify random reads interleave too.
+    auto const index = RandomIndex(kLineCount);
+    auto const position = index * kLineWidth;
+    ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data()));
+    EXPECT_EQ(size, kLineWidth);
+    auto actual = std::string{buffer.begin(), buffer.end()};
+    EXPECT_EQ(lines[index], actual);
+
+    // Verify random reads using buffers work.
+    ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth));
+    EXPECT_EQ(lines[index], b->ToString());
+  }
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  // Create a file large enough to make the random access tests non-trivial.
+  auto constexpr kLineWidth = 100;
+  auto constexpr kLineCount = 4096;
+  std::vector<std::string> lines(kLineCount);
+  int lineno = 0;
+  std::generate_n(lines.begin(), lines.size(),
+                  [&] { return RandomLine(++lineno, kLineWidth); });
+
+  const auto path =
+      kPreexistingBucket + std::string("/OpenInputFileRandomSeek/object-name");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  for (auto const& line : lines) {
+    ASSERT_OK(output->Write(line.data(), line.size()));
+  }
+  ASSERT_OK(output->Close());
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
+  for (int i = 0; i != 32; ++i) {
+    SCOPED_TRACE("Iteration " + std::to_string(i));
+    // Verify sequential reads work as expected.
+    auto const index = RandomIndex(kLineCount);
+    auto const position = index * kLineWidth;
+    ASSERT_OK(file->Seek(position));
+    ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+    EXPECT_EQ(lines[index], actual->ToString());
+  }
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileInfo) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  arrow::fs::FileInfo info;
+  ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath()));
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(info));
+
+  std::array<char, 1024> buffer{};
+  std::int64_t size;
+  auto constexpr kStart = 16;
+  ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(), buffer.data()));
+
+  auto const expected = std::string(kLoremIpsum).substr(kStart);
+  EXPECT_EQ(std::string(buffer.data(), size), expected);
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileNotFound) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  auto result = fs->OpenInputFile(NotFoundObjectPath());
+  EXPECT_EQ(result.status().code(), StatusCode::IOError);
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileInfoInvalid) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  arrow::fs::FileInfo info;
+  ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket));
+
+  auto result = fs->OpenInputFile(NotFoundObjectPath());

Review comment:
       Did you mean to call `fs->OpenInputFile(info)` instead?
   

##########
File path: cpp/src/arrow/filesystem/gcsfs_test.cc
##########
@@ -521,6 +540,130 @@ TEST_F(GcsIntegrationTest, WriteObjectLarge) {
   EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]);
 }
 
+TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  // Create a file large enough to make the random access tests non-trivial.
+  auto constexpr kLineWidth = 100;
+  auto constexpr kLineCount = 4096;
+  std::vector<std::string> lines(kLineCount);
+  int lineno = 0;
+  std::generate_n(lines.begin(), lines.size(),
+                  [&] { return RandomLine(++lineno, kLineWidth); });
+
+  const auto path =
+      kPreexistingBucket + std::string("/OpenInputFileMixedReadVsReadAt/object-name");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  for (auto const& line : lines) {
+    ASSERT_OK(output->Write(line.data(), line.size()));
+  }
+  ASSERT_OK(output->Close());
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
+  for (int i = 0; i != 32; ++i) {
+    SCOPED_TRACE("Iteration " + std::to_string(i));
+    // Verify sequential reads work as expected.
+    std::array<char, kLineWidth> buffer{};
+    std::int64_t size;
+    {
+      ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+      EXPECT_EQ(lines[2 * i], actual->ToString());
+    }
+    {
+      ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data()));
+      EXPECT_EQ(size, kLineWidth);
+      auto actual = std::string{buffer.begin(), buffer.end()};
+      EXPECT_EQ(lines[2 * i + 1], actual);
+    }
+
+    // Verify random reads interleave too.
+    auto const index = RandomIndex(kLineCount);
+    auto const position = index * kLineWidth;
+    ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data()));
+    EXPECT_EQ(size, kLineWidth);
+    auto actual = std::string{buffer.begin(), buffer.end()};
+    EXPECT_EQ(lines[index], actual);
+
+    // Verify random reads using buffers work.
+    ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth));
+    EXPECT_EQ(lines[index], b->ToString());
+  }
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  // Create a file large enough to make the random access tests non-trivial.
+  auto constexpr kLineWidth = 100;
+  auto constexpr kLineCount = 4096;
+  std::vector<std::string> lines(kLineCount);
+  int lineno = 0;
+  std::generate_n(lines.begin(), lines.size(),
+                  [&] { return RandomLine(++lineno, kLineWidth); });
+
+  const auto path =
+      kPreexistingBucket + std::string("/OpenInputFileRandomSeek/object-name");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  for (auto const& line : lines) {
+    ASSERT_OK(output->Write(line.data(), line.size()));
+  }
+  ASSERT_OK(output->Close());
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
+  for (int i = 0; i != 32; ++i) {
+    SCOPED_TRACE("Iteration " + std::to_string(i));
+    // Verify sequential reads work as expected.
+    auto const index = RandomIndex(kLineCount);
+    auto const position = index * kLineWidth;
+    ASSERT_OK(file->Seek(position));
+    ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+    EXPECT_EQ(lines[index], actual->ToString());
+  }
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileInfo) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  arrow::fs::FileInfo info;
+  ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath()));
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(info));
+
+  std::array<char, 1024> buffer{};
+  std::int64_t size;
+  auto constexpr kStart = 16;
+  ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(), buffer.data()));
+
+  auto const expected = std::string(kLoremIpsum).substr(kStart);
+  EXPECT_EQ(std::string(buffer.data(), size), expected);
+}
+
+TEST_F(GcsIntegrationTest, OpenInputFileNotFound) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  auto result = fs->OpenInputFile(NotFoundObjectPath());
+  EXPECT_EQ(result.status().code(), StatusCode::IOError);

Review comment:
       Can use `ASSERT_RAISES(IOError, fs->OpenInputFile(...))` here, and below as well.

##########
File path: cpp/src/arrow/filesystem/gcsfs_test.cc
##########
@@ -119,7 +122,23 @@ class GcsIntegrationTest : public ::testing::Test {
             .set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials()));
   }
 
+  std::string RandomLine(int lineno, std::size_t width) {
+    auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
+    std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);

Review comment:
       You can probably reuse `random_string` from `arrow/testing/util.h` instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org