You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2021/06/01 08:11:04 UTC

[arrow] branch master updated: ARROW-11161: [C++][Python] Add stream metadata

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8103972  ARROW-11161: [C++][Python] Add stream metadata
8103972 is described below

commit 81039729bd0b575e5abc2fca4b61f1c909b0e786
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Jun 1 10:09:37 2021 +0200

    ARROW-11161: [C++][Python] Add stream metadata
    
    Extend the InputStream API to allow reading metadata.
    
    Extend the FileSystem API to allow setting metadata when creating an output stream or file.
    
    Implement metadata reading and writing in the S3 filesystem.
    A few metadata keys are supported such as "Content-Type" and "Expires".
    
    Closes #10295 from pitrou/ARROW-11161-stream-metadata
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/filesystem/filesystem.cc      | 29 ++++++---
 cpp/src/arrow/filesystem/filesystem.h       | 20 ++++--
 cpp/src/arrow/filesystem/filesystem_test.cc | 15 +++++
 cpp/src/arrow/filesystem/hdfs.cc            |  4 +-
 cpp/src/arrow/filesystem/hdfs.h             |  6 +-
 cpp/src/arrow/filesystem/localfs.cc         |  4 +-
 cpp/src/arrow/filesystem/localfs.h          |  6 +-
 cpp/src/arrow/filesystem/mockfs.cc          | 35 +++++++----
 cpp/src/arrow/filesystem/mockfs.h           |  6 +-
 cpp/src/arrow/filesystem/s3fs.cc            | 98 +++++++++++++++++++++++++++--
 cpp/src/arrow/filesystem/s3fs.h             |  6 +-
 cpp/src/arrow/filesystem/s3fs_test.cc       | 31 ++++++++-
 cpp/src/arrow/filesystem/test_util.cc       | 32 ++++++++--
 cpp/src/arrow/filesystem/test_util.h        |  2 +
 cpp/src/arrow/io/buffered.cc                |  9 +++
 cpp/src/arrow/io/buffered.h                 |  3 +
 cpp/src/arrow/io/compressed.cc              |  9 +++
 cpp/src/arrow/io/compressed.h               |  3 +
 cpp/src/arrow/io/interfaces.cc              | 23 +++++--
 cpp/src/arrow/io/interfaces.h               | 21 +++++--
 cpp/src/arrow/io/memory.cc                  |  4 +-
 cpp/src/arrow/io/memory_test.cc             |  8 +++
 cpp/src/arrow/io/transform.cc               | 13 ++++
 cpp/src/arrow/io/transform.h                |  4 ++
 cpp/src/arrow/python/filesystem.cc          |  8 +--
 cpp/src/arrow/python/filesystem.h           |  8 ++-
 cpp/src/arrow/util/key_value_metadata.cc    |  5 ++
 cpp/src/arrow/util/key_value_metadata.h     |  3 +
 python/pyarrow/_fs.pyx                      | 52 +++++++++++----
 python/pyarrow/fs.py                        |  6 +-
 python/pyarrow/includes/libarrow.pxd        |  2 +-
 python/pyarrow/includes/libarrow_fs.pxd     |  9 +--
 python/pyarrow/io.pxi                       | 18 ++++++
 python/pyarrow/lib.pxd                      |  4 +-
 python/pyarrow/tests/test_fs.py             | 38 +++++++++--
 35 files changed, 451 insertions(+), 93 deletions(-)

diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc
index 98dc057..6b94d61 100644
--- a/cpp/src/arrow/filesystem/filesystem.cc
+++ b/cpp/src/arrow/filesystem/filesystem.cc
@@ -232,6 +232,16 @@ Future<std::shared_ptr<io::RandomAccessFile>> FileSystem::OpenInputFileAsync(
       [info](std::shared_ptr<FileSystem> self) { return self->OpenInputFile(info); });
 }
 
+Result<std::shared_ptr<io::OutputStream>> FileSystem::OpenOutputStream(
+    const std::string& path) {
+  return OpenOutputStream(path, std::shared_ptr<const KeyValueMetadata>{});
+}
+
+Result<std::shared_ptr<io::OutputStream>> FileSystem::OpenAppendStream(
+    const std::string& path) {
+  return OpenAppendStream(path, std::shared_ptr<const KeyValueMetadata>{});
+}
+
 //////////////////////////////////////////////////////////////////////////
 // SubTreeFileSystem implementation
 
@@ -447,17 +457,17 @@ Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAs
 }
 
 Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenOutputStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   auto s = path;
   RETURN_NOT_OK(PrependBaseNonEmpty(&s));
-  return base_fs_->OpenOutputStream(s);
+  return base_fs_->OpenOutputStream(s, metadata);
 }
 
 Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenAppendStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   auto s = path;
   RETURN_NOT_OK(PrependBaseNonEmpty(&s));
-  return base_fs_->OpenAppendStream(s);
+  return base_fs_->OpenAppendStream(s, metadata);
 }
 
 //////////////////////////////////////////////////////////////////////////
@@ -555,16 +565,16 @@ Result<std::shared_ptr<io::RandomAccessFile>> SlowFileSystem::OpenInputFile(
 }
 
 Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenOutputStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   latencies_->Sleep();
   // XXX Should we have a SlowOutputStream that waits on Flush() and Close()?
-  return base_fs_->OpenOutputStream(path);
+  return base_fs_->OpenOutputStream(path, metadata);
 }
 
 Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenAppendStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   latencies_->Sleep();
-  return base_fs_->OpenAppendStream(path);
+  return base_fs_->OpenAppendStream(path, metadata);
 }
 
 Status CopyFiles(const std::vector<FileLocator>& sources,
@@ -582,9 +592,10 @@ Status CopyFiles(const std::vector<FileLocator>& sources,
 
     ARROW_ASSIGN_OR_RAISE(auto source,
                           sources[i].filesystem->OpenInputStream(sources[i].path));
+    ARROW_ASSIGN_OR_RAISE(const auto metadata, source->ReadMetadata());
 
     ARROW_ASSIGN_OR_RAISE(auto destination, destinations[i].filesystem->OpenOutputStream(
-                                                destinations[i].path));
+                                                destinations[i].path, metadata));
     RETURN_NOT_OK(internal::CopyStream(source, destination, chunk_size, io_context));
     return destination->Close();
   };
diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h
index 2fc5836..c739471 100644
--- a/cpp/src/arrow/filesystem/filesystem.h
+++ b/cpp/src/arrow/filesystem/filesystem.h
@@ -283,13 +283,17 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this<FileSystem>
   ///
   /// If the target already exists, existing data is truncated.
   virtual Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
-      const std::string& path) = 0;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata) = 0;
+  Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(const std::string& path);
 
   /// Open an output stream for appending.
   ///
   /// If the target doesn't exist, a new empty file is created.
   virtual Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
-      const std::string& path) = 0;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata) = 0;
+  Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(const std::string& path);
 
  protected:
   explicit FileSystem(const io::IOContext& io_context = io::default_io_context())
@@ -364,9 +368,11 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem {
       const FileInfo& info) override;
 
   Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
 
  protected:
   SubTreeFileSystem() {}
@@ -420,9 +426,11 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem {
   Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
       const FileInfo& info) override;
   Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
 
  protected:
   std::shared_ptr<FileSystem> base_fs_;
diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc b/cpp/src/arrow/filesystem/filesystem_test.cc
index 8df84ff..4488935 100644
--- a/cpp/src/arrow/filesystem/filesystem_test.cc
+++ b/cpp/src/arrow/filesystem/filesystem_test.cc
@@ -28,6 +28,7 @@
 #include "arrow/filesystem/test_util.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/util/key_value_metadata.h"
 
 namespace arrow {
 namespace fs {
@@ -278,6 +279,8 @@ class TestMockFSGeneric : public ::testing::Test, public GenericFileSystemTest {
  protected:
   std::shared_ptr<FileSystem> GetEmptyFileSystem() override { return fs_; }
 
+  bool have_file_metadata() const override { return true; }
+
   TimePoint time_;
   std::shared_ptr<FileSystem> fs_;
 };
@@ -456,6 +459,18 @@ TEST_F(TestMockFS, OpenOutputStream) {
   ASSERT_OK(stream->Close());
   CheckDirs({});
   CheckFiles({{"ab", time_, ""}});
+
+  // With metadata
+  auto metadata = KeyValueMetadata::Make({"some key"}, {"some value"});
+  ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("cd", metadata));
+  ASSERT_OK(WriteString(stream.get(), "data"));
+  ASSERT_OK(stream->Close());
+  CheckFiles({{"ab", time_, ""}, {"cd", time_, "data"}});
+
+  ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream("cd"));
+  ASSERT_OK_AND_ASSIGN(auto got_metadata, input->ReadMetadata());
+  ASSERT_NE(got_metadata, nullptr);
+  ASSERT_TRUE(got_metadata->Equals(*metadata));
 }
 
 TEST_F(TestMockFS, OpenAppendStream) {
diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc
index 6ac81d0..7743317 100644
--- a/cpp/src/arrow/filesystem/hdfs.cc
+++ b/cpp/src/arrow/filesystem/hdfs.cc
@@ -471,12 +471,12 @@ Result<std::shared_ptr<io::RandomAccessFile>> HadoopFileSystem::OpenInputFile(
 }
 
 Result<std::shared_ptr<io::OutputStream>> HadoopFileSystem::OpenOutputStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   return impl_->OpenOutputStream(path);
 }
 
 Result<std::shared_ptr<io::OutputStream>> HadoopFileSystem::OpenAppendStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   return impl_->OpenAppendStream(path);
 }
 
diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h
index 72cb469..bc72e1c 100644
--- a/cpp/src/arrow/filesystem/hdfs.h
+++ b/cpp/src/arrow/filesystem/hdfs.h
@@ -92,9 +92,11 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem {
   Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
       const std::string& path) override;
   Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
 
   /// Create a HdfsFileSystem instance from the given options.
   static Result<std::shared_ptr<HadoopFileSystem>> Make(
diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc
index 490bace..775fd74 100644
--- a/cpp/src/arrow/filesystem/localfs.cc
+++ b/cpp/src/arrow/filesystem/localfs.cc
@@ -431,14 +431,14 @@ Result<std::shared_ptr<io::OutputStream>> OpenOutputStreamGeneric(const std::str
 }  // namespace
 
 Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenOutputStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   bool truncate = true;
   bool append = false;
   return OpenOutputStreamGeneric(path, truncate, append);
 }
 
 Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenAppendStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   bool truncate = false;
   bool append = true;
   return OpenOutputStreamGeneric(path, truncate, append);
diff --git a/cpp/src/arrow/filesystem/localfs.h b/cpp/src/arrow/filesystem/localfs.h
index d660dd3..f8e77ae 100644
--- a/cpp/src/arrow/filesystem/localfs.h
+++ b/cpp/src/arrow/filesystem/localfs.h
@@ -91,9 +91,11 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem {
   Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
       const std::string& path) override;
   Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
 
  protected:
   LocalFileSystemOptions options_;
diff --git a/cpp/src/arrow/filesystem/mockfs.cc b/cpp/src/arrow/filesystem/mockfs.cc
index e1ac05c..f2d2f87 100644
--- a/cpp/src/arrow/filesystem/mockfs.cc
+++ b/cpp/src/arrow/filesystem/mockfs.cc
@@ -53,6 +53,7 @@ struct File {
   TimePoint mtime;
   std::string name;
   std::shared_ptr<Buffer> data;
+  std::shared_ptr<const KeyValueMetadata> metadata;
 
   File(TimePoint mtime, std::string name) : mtime(mtime), name(std::move(name)) {}
 
@@ -232,6 +233,19 @@ class MockFSOutputStream : public io::OutputStream {
   bool closed_;
 };
 
+class MockFSInputStream : public io::BufferReader {
+ public:
+  explicit MockFSInputStream(const File& file)
+      : io::BufferReader(file.data), metadata_(file.metadata) {}
+
+  Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
+    return metadata_;
+  }
+
+ protected:
+  std::shared_ptr<const KeyValueMetadata> metadata_;
+};
+
 }  // namespace
 
 std::ostream& operator<<(std::ostream& os, const MockDirInfo& di) {
@@ -358,8 +372,9 @@ class MockFileSystem::Impl {
     }
   }
 
-  Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(const std::string& path,
-                                                             bool append) {
+  Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
+      const std::string& path, bool append,
+      const std::shared_ptr<const KeyValueMetadata>& metadata) {
     auto parts = SplitAbstractPath(path);
     RETURN_NOT_OK(ValidateAbstractPathParts(parts));
 
@@ -381,6 +396,7 @@ class MockFileSystem::Impl {
     } else {
       return NotAFile(path);
     }
+    file->metadata = metadata;
     auto ptr = std::make_shared<MockFSOutputStream>(file, pool);
     if (append && file->data) {
       RETURN_NOT_OK(ptr->Write(file->data->data(), file->data->size()));
@@ -399,12 +415,7 @@ class MockFileSystem::Impl {
     if (!entry->is_file()) {
       return NotAFile(path);
     }
-    const auto& file = entry->as_file();
-    if (file.data) {
-      return std::make_shared<io::BufferReader>(file.data);
-    } else {
-      return std::make_shared<io::BufferReader>("");
-    }
+    return std::make_shared<MockFSInputStream>(entry->as_file());
   }
 };
 
@@ -687,17 +698,17 @@ Result<std::shared_ptr<io::RandomAccessFile>> MockFileSystem::OpenInputFile(
 }
 
 Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenOutputStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   auto guard = impl_->lock_guard();
 
-  return impl_->OpenOutputStream(path, false /* append */);
+  return impl_->OpenOutputStream(path, /*append=*/false, metadata);
 }
 
 Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenAppendStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   auto guard = impl_->lock_guard();
 
-  return impl_->OpenOutputStream(path, true /* append */);
+  return impl_->OpenOutputStream(path, /*append=*/true, metadata);
 }
 
 std::vector<MockDirInfo> MockFileSystem::AllDirs() {
diff --git a/cpp/src/arrow/filesystem/mockfs.h b/cpp/src/arrow/filesystem/mockfs.h
index af0a327..378f30d 100644
--- a/cpp/src/arrow/filesystem/mockfs.h
+++ b/cpp/src/arrow/filesystem/mockfs.h
@@ -90,9 +90,11 @@ class ARROW_EXPORT MockFileSystem : public FileSystem {
   Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
       const std::string& path) override;
   Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
 
   // Contents-dumping helpers to ease testing.
   // Output is lexicographically-ordered by full path.
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index c22571a..fe90368 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -78,6 +78,7 @@
 #include "arrow/util/atomic_shared_ptr.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/future.h"
+#include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/optional.h"
 #include "arrow/util/task_group.h"
@@ -684,6 +685,80 @@ Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
   return OutcomeToResult(client->GetObject(req));
 }
 
+template <typename ObjectResult>
+std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& result) {
+  auto md = std::make_shared<KeyValueMetadata>();
+
+  auto push = [&](std::string k, const Aws::String& v) {
+    if (!v.empty()) {
+      md->Append(std::move(k), FromAwsString(v).to_string());
+    }
+  };
+  auto push_datetime = [&](std::string k, const Aws::Utils::DateTime& v) {
+    if (v != Aws::Utils::DateTime(0.0)) {
+      push(std::move(k), v.ToGmtString(Aws::Utils::DateFormat::ISO_8601));
+    }
+  };
+
+  md->Append("Content-Length", std::to_string(result.GetContentLength()));
+  push("Cache-Control", result.GetCacheControl());
+  push("Content-Type", result.GetContentType());
+  push("Content-Language", result.GetContentLanguage());
+  push("ETag", result.GetETag());
+  push("VersionId", result.GetVersionId());
+  push_datetime("Last-Modified", result.GetLastModified());
+  push_datetime("Expires", result.GetExpires());
+  return md;
+}
+
+template <typename ObjectRequest>
+struct ObjectMetadataSetter {
+  using Setter = std::function<Status(const std::string& value, ObjectRequest* req)>;
+
+  static std::unordered_map<std::string, Setter> GetSetters() {
+    return {{"Cache-Control", StringSetter(&ObjectRequest::SetCacheControl)},
+            {"Content-Type", StringSetter(&ObjectRequest::SetContentType)},
+            {"Content-Language", StringSetter(&ObjectRequest::SetContentLanguage)},
+            {"Expires", DateTimeSetter(&ObjectRequest::SetExpires)}};
+  }
+
+ private:
+  static Setter StringSetter(void (ObjectRequest::*req_method)(Aws::String&&)) {
+    return [req_method](const std::string& v, ObjectRequest* req) {
+      (req->*req_method)(ToAwsString(v));
+      return Status::OK();
+    };
+  }
+
+  static Setter DateTimeSetter(
+      void (ObjectRequest::*req_method)(Aws::Utils::DateTime&&)) {
+    return [req_method](const std::string& v, ObjectRequest* req) {
+      (req->*req_method)(
+          Aws::Utils::DateTime(v.data(), Aws::Utils::DateFormat::ISO_8601));
+      return Status::OK();
+    };
+  }
+};
+
+template <typename ObjectRequest>
+Status SetObjectMetadata(const std::shared_ptr<const KeyValueMetadata>& metadata,
+                         ObjectRequest* req) {
+  static auto setters = ObjectMetadataSetter<ObjectRequest>::GetSetters();
+
+  if (metadata) {
+    const auto& keys = metadata->keys();
+    const auto& values = metadata->values();
+
+    for (size_t i = 0; i < keys.size(); ++i) {
+      auto it = setters.find(keys[i]);
+      if (it != setters.end()) {
+        RETURN_NOT_OK(it->second(values[i], req));
+      }
+    }
+  }
+  return Status::OK();
+}
+
 // A RandomAccessFile that reads from a S3 object
 class ObjectInputFile final : public io::RandomAccessFile {
  public:
@@ -720,6 +795,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
     }
     content_length_ = outcome.GetResult().GetContentLength();
     DCHECK_GE(content_length_, 0);
+    metadata_ = GetObjectMetadata(outcome.GetResult());
     return Status::OK();
   }
 
@@ -742,6 +818,15 @@ class ObjectInputFile final : public io::RandomAccessFile {
 
   // RandomAccessFile APIs
 
+  Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
+    return metadata_;
+  }
+
+  Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
+      const io::IOContext& io_context) override {
+    return metadata_;
+  }
+
   Status Close() override {
     client_ = nullptr;
     closed_ = true;
@@ -825,6 +910,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
   bool closed_ = false;
   int64_t pos_ = 0;
   int64_t content_length_ = kNoSize;
+  std::shared_ptr<const KeyValueMetadata> metadata_;
 };
 
 // Minimum size for each part of a multipart upload, except for the last part.
@@ -841,10 +927,12 @@ class ObjectOutputStream final : public io::OutputStream {
  public:
   ObjectOutputStream(std::shared_ptr<Aws::S3::S3Client> client,
                      const io::IOContext& io_context, const S3Path& path,
-                     const S3Options& options)
+                     const S3Options& options,
+                     const std::shared_ptr<const KeyValueMetadata>& metadata)
       : client_(std::move(client)),
         io_context_(io_context),
         path_(path),
+        metadata_(metadata),
         background_writes_(options.background_writes) {}
 
   ~ObjectOutputStream() override {
@@ -858,6 +946,7 @@ class ObjectOutputStream final : public io::OutputStream {
     S3Model::CreateMultipartUploadRequest req;
     req.SetBucket(ToAwsString(path_.bucket));
     req.SetKey(ToAwsString(path_.key));
+    RETURN_NOT_OK(SetObjectMetadata(metadata_, &req));
 
     auto outcome = client_->CreateMultipartUpload(req);
     if (!outcome.IsSuccess()) {
@@ -1127,6 +1216,7 @@ class ObjectOutputStream final : public io::OutputStream {
   std::shared_ptr<Aws::S3::S3Client> client_;
   const io::IOContext io_context_;
   const S3Path path_;
+  const std::shared_ptr<const KeyValueMetadata> metadata_;
   const bool background_writes_;
 
   Aws::String upload_id_;
@@ -2106,18 +2196,18 @@ Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
 }
 
 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
-    const std::string& s) {
+    const std::string& s, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
   RETURN_NOT_OK(ValidateFilePath(path));
 
   auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, io_context(), path,
-                                                  impl_->options());
+                                                  impl_->options(), metadata);
   RETURN_NOT_OK(ptr->Init());
   return ptr;
 }
 
 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   // XXX Investigate UploadPartCopy? Does it work with source == destination?
   // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
   // (but would need to fall back to GET if the current data is < 5 MB)
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index 6e73ed4..4fb7b00 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -214,10 +214,12 @@ class ARROW_EXPORT S3FileSystem : public FileSystem {
   /// It is recommended to enable background_writes unless you prefer
   /// implementing your own background execution strategy.
   Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
 
   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
 
   /// Create a S3FileSystem instance from the given options.
   static Result<std::shared_ptr<S3FileSystem>> Make(
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc
index f5efcda..5ba8e23 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -77,6 +77,7 @@
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/future.h"
 #include "arrow/util/io_util.h"
+#include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 
@@ -397,6 +398,7 @@ class TestS3FS : public S3TestMixin {
       ASSERT_OK(OutcomeToStatus(client_->PutObject(req)));
       req.SetKey(ToAwsString("somefile"));
       req.SetBody(std::make_shared<std::stringstream>("some data"));
+      req.SetContentType("x-arrow/test");
       ASSERT_OK(OutcomeToStatus(client_->PutObject(req)));
     }
   }
@@ -451,6 +453,17 @@ class TestS3FS : public S3TestMixin {
     ASSERT_OK(stream->Close());
     AssertObjectContents(client_.get(), "bucket", "newfile4", expected);
 
+    // Create new file with metadata
+    auto metadata = KeyValueMetadata::Make({"Content-Type", "Expires"},
+                                           {"x-arrow/test6", "2016-02-05T20:08:35Z"});
+    ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile5", metadata));
+    ASSERT_OK(stream->Close());
+    ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream("bucket/newfile5"));
+    ASSERT_OK_AND_ASSIGN(auto got_metadata, input->ReadMetadata());
+    ASSERT_NE(got_metadata, nullptr);
+    ASSERT_THAT(got_metadata->sorted_pairs(),
+                testing::IsSupersetOf(metadata->sorted_pairs()));
+
     // Overwrite
     ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile1"));
     ASSERT_OK(stream->Write("overwritten data"));
@@ -465,12 +478,12 @@ class TestS3FS : public S3TestMixin {
     // Open file and then lose filesystem reference
     ASSERT_EQ(fs_.use_count(), 1);  // needed for test to work
     std::weak_ptr<S3FileSystem> weak_fs(fs_);
-    ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile5"));
+    ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile6"));
     fs_.reset();
     ASSERT_OK(stream->Write("some other data"));
     ASSERT_OK(stream->Close());
     ASSERT_TRUE(weak_fs.expired());
-    AssertObjectContents(client_.get(), "bucket", "newfile5", "some other data");
+    AssertObjectContents(client_.get(), "bucket", "newfile6", "some other data");
   }
 
   void TestOpenOutputStreamAbort() {
@@ -839,6 +852,19 @@ TEST_F(TestS3FS, OpenInputStream) {
   ASSERT_TRUE(weak_fs.expired());
 }
 
+TEST_F(TestS3FS, OpenInputStreamMetadata) {
+  std::shared_ptr<io::InputStream> stream;
+  std::shared_ptr<const KeyValueMetadata> metadata;
+
+  ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream("bucket/somefile"));
+  ASSERT_FINISHES_OK_AND_ASSIGN(metadata, stream->ReadMetadataAsync());
+
+  std::vector<std::pair<std::string, std::string>> expected_kv{
+      {"Content-Length", "9"}, {"Content-Type", "x-arrow/test"}};
+  ASSERT_NE(metadata, nullptr);
+  ASSERT_THAT(metadata->sorted_pairs(), testing::IsSupersetOf(expected_kv));
+}
+
 TEST_F(TestS3FS, OpenInputFile) {
   std::shared_ptr<io::RandomAccessFile> file;
   std::shared_ptr<Buffer> buf;
@@ -959,6 +985,7 @@ class TestS3FSGeneric : public S3TestMixin, public GenericFileSystemTest {
     return false;
 #endif
   }
+  bool have_file_metadata() const override { return true; }
 
   S3Options options_;
   std::shared_ptr<S3FileSystem> s3fs_;
diff --git a/cpp/src/arrow/filesystem/test_util.cc b/cpp/src/arrow/filesystem/test_util.cc
index 2a40c04..bbff33f 100644
--- a/cpp/src/arrow/filesystem/test_util.cc
+++ b/cpp/src/arrow/filesystem/test_util.cc
@@ -17,6 +17,7 @@
 
 #include <algorithm>
 #include <chrono>
+#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
@@ -32,6 +33,7 @@
 #include "arrow/testing/gtest_util.h"
 #include "arrow/util/async_generator.h"
 #include "arrow/util/future.h"
+#include "arrow/util/key_value_metadata.h"
 #include "arrow/util/vector.h"
 
 using ::testing::ElementsAre;
@@ -410,9 +412,9 @@ void GenericFileSystemTest::TestMoveFile(FileSystem* fs) {
 
 void GenericFileSystemTest::TestMoveDir(FileSystem* fs) {
   if (!allow_move_dir()) {
-    // XXX skip
-    return;
+    GTEST_SKIP() << "Filesystem doesn't allow moving directories";
   }
+
   ASSERT_OK(fs->CreateDir("AB/CD"));
   ASSERT_OK(fs->CreateDir("EF"));
   CreateFile(fs, "AB/abc", "abc data");
@@ -851,6 +853,24 @@ void GenericFileSystemTest::TestOpenOutputStream(FileSystem* fs) {
 
   ASSERT_RAISES(Invalid, stream->Write("x"));  // Stream is closed
 
+  // Storing metadata along file
+  auto metadata = KeyValueMetadata::Make({"Content-Type", "Content-Language"},
+                                         {"x-arrow/filesystem-test", "fr_FR"});
+  ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream("jkl", metadata));
+  ASSERT_OK(stream->Write("data"));
+  ASSERT_OK(stream->Close());
+  ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream("jkl"));
+  ASSERT_OK_AND_ASSIGN(auto got_metadata, input->ReadMetadata());
+  if (have_file_metadata()) {
+    ASSERT_NE(got_metadata, nullptr);
+    ASSERT_GE(got_metadata->size(), 2);
+    ASSERT_OK_AND_EQ("x-arrow/filesystem-test", got_metadata->Get("Content-Type"));
+  } else {
+    if (got_metadata) {
+      ASSERT_EQ(got_metadata->size(), 0);
+    }
+  }
+
   if (!allow_write_file_over_dir()) {
     // Cannot turn dir into file
     ASSERT_RAISES(IOError, fs->OpenOutputStream("CD"));
@@ -860,9 +880,9 @@ void GenericFileSystemTest::TestOpenOutputStream(FileSystem* fs) {
 
 void GenericFileSystemTest::TestOpenAppendStream(FileSystem* fs) {
   if (!allow_append_to_file()) {
-    // XXX skip
-    return;
+    GTEST_SKIP() << "Filesystem doesn't allow file appends";
   }
+
   std::shared_ptr<io::OutputStream> stream;
 
   ASSERT_OK_AND_ASSIGN(stream, fs->OpenAppendStream("abc"));
@@ -893,6 +913,8 @@ void GenericFileSystemTest::TestOpenInputStream(FileSystem* fs) {
   std::shared_ptr<io::InputStream> stream;
   std::shared_ptr<Buffer> buffer;
   ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream("AB/abc"));
+  ASSERT_OK_AND_ASSIGN(auto metadata, stream->ReadMetadata());
+  // XXX we cannot really test anything more about metadata...
   ASSERT_OK_AND_ASSIGN(buffer, stream->Read(4));
   AssertBufferEqual(*buffer, "some");
   ASSERT_OK_AND_ASSIGN(buffer, stream->Read(6));
@@ -946,7 +968,9 @@ void GenericFileSystemTest::TestOpenInputStreamAsync(FileSystem* fs) {
 
   std::shared_ptr<io::InputStream> stream;
   std::shared_ptr<Buffer> buffer;
+  std::shared_ptr<const KeyValueMetadata> metadata;
   ASSERT_FINISHES_OK_AND_ASSIGN(stream, fs->OpenInputStreamAsync("AB/abc"));
+  ASSERT_FINISHES_OK_AND_ASSIGN(metadata, stream->ReadMetadataAsync());
   ASSERT_OK_AND_ASSIGN(buffer, stream->Read(4));
   AssertBufferEqual(*buffer, "some");
   ASSERT_OK(stream->Close());
diff --git a/cpp/src/arrow/filesystem/test_util.h b/cpp/src/arrow/filesystem/test_util.h
index 7941756..64577e1 100644
--- a/cpp/src/arrow/filesystem/test_util.h
+++ b/cpp/src/arrow/filesystem/test_util.h
@@ -140,6 +140,8 @@ class ARROW_TESTING_EXPORT GenericFileSystemTest {
   virtual bool have_directory_mtimes() const { return true; }
   // - Whether some directory tree deletion tests may fail randomly
   virtual bool have_flaky_directory_tree_deletion() const { return false; }
+  // - Whether the filesystem stores some metadata alongside files
+  virtual bool have_file_metadata() const { return false; }
 
   void TestEmpty(FileSystem* fs);
   void TestNormalizePath(FileSystem* fs);
diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc
index 16b9696..7804c13 100644
--- a/cpp/src/arrow/io/buffered.cc
+++ b/cpp/src/arrow/io/buffered.cc
@@ -476,5 +476,14 @@ Result<std::shared_ptr<Buffer>> BufferedInputStream::DoRead(int64_t nbytes) {
   return impl_->Read(nbytes);
 }
 
+Result<std::shared_ptr<const KeyValueMetadata>> BufferedInputStream::ReadMetadata() {
+  return impl_->raw()->ReadMetadata();
+}
+
+Future<std::shared_ptr<const KeyValueMetadata>> BufferedInputStream::ReadMetadataAsync(
+    const IOContext& io_context) {
+  return impl_->raw()->ReadMetadataAsync(io_context);
+}
+
 }  // namespace io
 }  // namespace arrow
diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h
index 56c8c39..8116613 100644
--- a/cpp/src/arrow/io/buffered.h
+++ b/cpp/src/arrow/io/buffered.h
@@ -132,6 +132,9 @@ class ARROW_EXPORT BufferedInputStream
   // InputStream APIs
 
   bool closed() const override;
+  Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override;
+  Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
+      const IOContext& io_context) override;
 
  private:
   friend InputStreamConcurrencyWrapper<BufferedInputStream>;
diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc
index 4993ae2..72977f0 100644
--- a/cpp/src/arrow/io/compressed.cc
+++ b/cpp/src/arrow/io/compressed.cc
@@ -437,5 +437,14 @@ Result<std::shared_ptr<Buffer>> CompressedInputStream::DoRead(int64_t nbytes) {
 
 std::shared_ptr<InputStream> CompressedInputStream::raw() const { return impl_->raw(); }
 
+Result<std::shared_ptr<const KeyValueMetadata>> CompressedInputStream::ReadMetadata() {
+  return impl_->raw()->ReadMetadata();
+}
+
+Future<std::shared_ptr<const KeyValueMetadata>> CompressedInputStream::ReadMetadataAsync(
+    const IOContext& io_context) {
+  return impl_->raw()->ReadMetadataAsync(io_context);
+}
+
 }  // namespace io
 }  // namespace arrow
diff --git a/cpp/src/arrow/io/compressed.h b/cpp/src/arrow/io/compressed.h
index 677e45c..cd1a7f6 100644
--- a/cpp/src/arrow/io/compressed.h
+++ b/cpp/src/arrow/io/compressed.h
@@ -89,6 +89,9 @@ class ARROW_EXPORT CompressedInputStream
   // InputStream interface
 
   bool closed() const override;
+  Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override;
+  Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
+      const IOContext& io_context) override;
 
   /// \brief Return the underlying raw input stream.
   std::shared_ptr<InputStream> raw() const;
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 7193a56..954c0f3 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -33,6 +33,7 @@
 #include "arrow/io/util_internal.h"
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/checked_cast.h"
 #include "arrow/util/future.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/logging.h"
@@ -41,6 +42,7 @@
 
 namespace arrow {
 
+using internal::checked_pointer_cast;
 using internal::Executor;
 using internal::TaskHints;
 using internal::ThreadPool;
@@ -105,6 +107,22 @@ Result<util::string_view> InputStream::Peek(int64_t ARROW_ARG_UNUSED(nbytes)) {
 
 bool InputStream::supports_zero_copy() const { return false; }
 
+Result<std::shared_ptr<const KeyValueMetadata>> InputStream::ReadMetadata() {
+  return std::shared_ptr<const KeyValueMetadata>{};
+}
+
+// Default ReadMetadataAsync() implementation: simply issue the read on the context's
+// executor
+Future<std::shared_ptr<const KeyValueMetadata>> InputStream::ReadMetadataAsync(
+    const IOContext& ctx) {
+  auto self = shared_from_this();
+  return DeferNotOk(internal::SubmitIO(ctx, [self] { return self->ReadMetadata(); }));
+}
+
+Future<std::shared_ptr<const KeyValueMetadata>> InputStream::ReadMetadataAsync() {
+  return ReadMetadataAsync(io_context());
+}
+
 Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
     std::shared_ptr<InputStream> stream, int64_t block_size) {
   if (stream->closed()) {
@@ -139,10 +157,7 @@ Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position,
 Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& ctx,
                                                             int64_t position,
                                                             int64_t nbytes) {
-  auto self = shared_from_this();
-  TaskHints hints;
-  hints.io_size = nbytes;
-  hints.external_id = ctx.external_id();
+  auto self = checked_pointer_cast<RandomAccessFile>(shared_from_this());
   return DeferNotOk(internal::SubmitIO(
       ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); }));
 }
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 0afd2f2..e524afa 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -202,7 +202,9 @@ class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable
   OutputStream() = default;
 };
 
-class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Readable {
+class ARROW_EXPORT InputStream : virtual public FileInterface,
+                                 virtual public Readable,
+                                 public std::enable_shared_from_this<InputStream> {
  public:
   /// \brief Advance or skip stream indicated number of bytes
   /// \param[in] nbytes the number to move forward
@@ -225,14 +227,23 @@ class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Re
   /// Zero copy reads imply the use of Buffer-returning Read() overloads.
   virtual bool supports_zero_copy() const;
 
+  /// \brief Read and return stream metadata
+  ///
+  /// If the stream implementation doesn't support metadata, empty metadata
+  /// is returned.  Note that it is allowed to return a null pointer rather
+  /// than an allocated empty metadata.
+  virtual Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata();
+
+  /// \brief Read stream metadata asynchronously
+  virtual Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
+      const IOContext& io_context);
+  Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync();
+
  protected:
   InputStream() = default;
 };
 
-class ARROW_EXPORT RandomAccessFile
-    : public std::enable_shared_from_this<RandomAccessFile>,
-      public InputStream,
-      public Seekable {
+class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
  public:
   /// Necessary because we hold a std::unique_ptr
   ~RandomAccessFile() override;
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 7d11118..6495242 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -263,8 +263,8 @@ void FixedSizeBufferWriter::set_memcopy_threshold(int64_t threshold) {
 
 BufferReader::BufferReader(std::shared_ptr<Buffer> buffer)
     : buffer_(std::move(buffer)),
-      data_(buffer_->data()),
-      size_(buffer_->size()),
+      data_(buffer_ ? buffer_->data() : reinterpret_cast<const uint8_t*>("")),
+      size_(buffer_ ? buffer_->size() : 0),
       position_(0),
       is_open_(true) {}
 
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index 3365674..bd62761 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -179,6 +179,14 @@ TEST(TestBufferReader, FromStrings) {
   ASSERT_EQ(0, memcmp(piece->data(), data.data() + 2, 4));
 }
 
+TEST(TestBufferReader, FromNullBuffer) {
+  std::shared_ptr<Buffer> buf;
+  BufferReader reader(buf);
+  ASSERT_OK_AND_EQ(0, reader.GetSize());
+  ASSERT_OK_AND_ASSIGN(auto piece, reader.Read(10));
+  ASSERT_EQ(0, piece->size());
+}
+
 TEST(TestBufferReader, Seeking) {
   std::string data = "data123456";
 
diff --git a/cpp/src/arrow/io/transform.cc b/cpp/src/arrow/io/transform.cc
index a0b0b33..3fdf5a7 100644
--- a/cpp/src/arrow/io/transform.cc
+++ b/cpp/src/arrow/io/transform.cc
@@ -145,5 +145,18 @@ Result<int64_t> TransformInputStream::Tell() const {
   return impl_->pos_;
 }
 
+Result<std::shared_ptr<const KeyValueMetadata>> TransformInputStream::ReadMetadata() {
+  RETURN_NOT_OK(impl_->CheckClosed());
+
+  return impl_->wrapped_->ReadMetadata();
+}
+
+Future<std::shared_ptr<const KeyValueMetadata>> TransformInputStream::ReadMetadataAsync(
+    const IOContext& io_context) {
+  RETURN_NOT_OK(impl_->CheckClosed());
+
+  return impl_->wrapped_->ReadMetadataAsync(io_context);
+}
+
 }  // namespace io
 }  // namespace arrow
diff --git a/cpp/src/arrow/io/transform.h b/cpp/src/arrow/io/transform.h
index d983b7c..c117f27 100644
--- a/cpp/src/arrow/io/transform.h
+++ b/cpp/src/arrow/io/transform.h
@@ -45,6 +45,10 @@ class ARROW_EXPORT TransformInputStream : public InputStream {
   Result<int64_t> Read(int64_t nbytes, void* out) override;
   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
 
+  Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override;
+  Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
+      const IOContext& io_context) override;
+
   Result<int64_t> Tell() const override;
 
  protected:
diff --git a/cpp/src/arrow/python/filesystem.cc b/cpp/src/arrow/python/filesystem.cc
index 8e8e8a6..8c12f05 100644
--- a/cpp/src/arrow/python/filesystem.cc
+++ b/cpp/src/arrow/python/filesystem.cc
@@ -170,10 +170,10 @@ Result<std::shared_ptr<io::RandomAccessFile>> PyFileSystem::OpenInputFile(
 }
 
 Result<std::shared_ptr<io::OutputStream>> PyFileSystem::OpenOutputStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   std::shared_ptr<io::OutputStream> stream;
   auto st = SafeCallIntoPython([&]() -> Status {
-    vtable_.open_output_stream(handler_.obj(), path, &stream);
+    vtable_.open_output_stream(handler_.obj(), path, metadata, &stream);
     return CheckPyError();
   });
   RETURN_NOT_OK(st);
@@ -181,10 +181,10 @@ Result<std::shared_ptr<io::OutputStream>> PyFileSystem::OpenOutputStream(
 }
 
 Result<std::shared_ptr<io::OutputStream>> PyFileSystem::OpenAppendStream(
-    const std::string& path) {
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
   std::shared_ptr<io::OutputStream> stream;
   auto st = SafeCallIntoPython([&]() -> Status {
-    vtable_.open_append_stream(handler_.obj(), path, &stream);
+    vtable_.open_append_stream(handler_.obj(), path, metadata, &stream);
     return CheckPyError();
   });
   RETURN_NOT_OK(st);
diff --git a/cpp/src/arrow/python/filesystem.h b/cpp/src/arrow/python/filesystem.h
index f2d9c90..e1235f8 100644
--- a/cpp/src/arrow/python/filesystem.h
+++ b/cpp/src/arrow/python/filesystem.h
@@ -60,9 +60,11 @@ class ARROW_PYTHON_EXPORT PyFileSystemVtable {
                      std::shared_ptr<io::RandomAccessFile>* out)>
       open_input_file;
   std::function<void(PyObject*, const std::string& path,
+                     const std::shared_ptr<const KeyValueMetadata>&,
                      std::shared_ptr<io::OutputStream>* out)>
       open_output_stream;
   std::function<void(PyObject*, const std::string& path,
+                     const std::shared_ptr<const KeyValueMetadata>&,
                      std::shared_ptr<io::OutputStream>* out)>
       open_append_stream;
 
@@ -104,9 +106,11 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem {
   Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
       const std::string& path) override;
   Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
-      const std::string& path) override;
+      const std::string& path,
+      const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
 
   Result<std::string> NormalizePath(std::string path) override;
 
diff --git a/cpp/src/arrow/util/key_value_metadata.cc b/cpp/src/arrow/util/key_value_metadata.cc
index 4c6af29..ad3b686 100644
--- a/cpp/src/arrow/util/key_value_metadata.cc
+++ b/cpp/src/arrow/util/key_value_metadata.cc
@@ -70,6 +70,11 @@ KeyValueMetadata::KeyValueMetadata(std::vector<std::string> keys,
   ARROW_CHECK_EQ(keys.size(), values.size());
 }
 
+std::shared_ptr<KeyValueMetadata> KeyValueMetadata::Make(
+    std::vector<std::string> keys, std::vector<std::string> values) {
+  return std::make_shared<KeyValueMetadata>(std::move(keys), std::move(values));
+}
+
 void KeyValueMetadata::ToUnorderedMap(
     std::unordered_map<std::string, std::string>* out) const {
   DCHECK_NE(out, nullptr);
diff --git a/cpp/src/arrow/util/key_value_metadata.h b/cpp/src/arrow/util/key_value_metadata.h
index d4207a5..d42ab78 100644
--- a/cpp/src/arrow/util/key_value_metadata.h
+++ b/cpp/src/arrow/util/key_value_metadata.h
@@ -39,6 +39,9 @@ class ARROW_EXPORT KeyValueMetadata {
   explicit KeyValueMetadata(const std::unordered_map<std::string, std::string>& map);
   virtual ~KeyValueMetadata() = default;
 
+  static std::shared_ptr<KeyValueMetadata> Make(std::vector<std::string> keys,
+                                                std::vector<std::string> values);
+
   void ToUnorderedMap(std::unordered_map<std::string, std::string>* out) const;
   void Append(const std::string& key, const std::string& value);
 
diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx
index d881c74..42e2484 100644
--- a/python/pyarrow/_fs.pyx
+++ b/python/pyarrow/_fs.pyx
@@ -627,7 +627,8 @@ cdef class FileSystem(_Weakrefable):
             stream, path=path, compression=compression, buffer_size=buffer_size
         )
 
-    def open_output_stream(self, path, compression='detect', buffer_size=None):
+    def open_output_stream(self, path, compression='detect',
+                           buffer_size=None, metadata=None):
         """
         Open an output stream for sequential writing.
 
@@ -646,6 +647,11 @@ cdef class FileSystem(_Weakrefable):
         buffer_size: int optional, default None
             If None or 0, no buffering will happen. Otherwise the size of the
             temporary write buffer.
+        metadata: dict optional, default None
+            If not None, a mapping of string keys to string values.
+            Some filesystems support storing metadata along the file
+            (such as "Content-Type").
+            Unsupported metadata keys will be ignored.
 
         Returns
         -------
@@ -655,9 +661,14 @@ cdef class FileSystem(_Weakrefable):
             c_string pathstr = _path_as_bytes(path)
             NativeFile stream = NativeFile()
             shared_ptr[COutputStream] out_handle
+            shared_ptr[const CKeyValueMetadata] c_metadata
+
+        if metadata is not None:
+            c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(metadata))
 
         with nogil:
-            out_handle = GetResultValue(self.fs.OpenOutputStream(pathstr))
+            out_handle = GetResultValue(
+                self.fs.OpenOutputStream(pathstr, c_metadata))
 
         stream.set_output_stream(out_handle)
         stream.is_writable = True
@@ -666,7 +677,8 @@ cdef class FileSystem(_Weakrefable):
             stream, path=path, compression=compression, buffer_size=buffer_size
         )
 
-    def open_append_stream(self, path, compression='detect', buffer_size=None):
+    def open_append_stream(self, path, compression='detect',
+                           buffer_size=None, metadata=None):
         """
         Open an output stream for appending.
 
@@ -685,6 +697,11 @@ cdef class FileSystem(_Weakrefable):
         buffer_size: int optional, default None
             If None or 0, no buffering will happen. Otherwise the size of the
             temporary write buffer.
+        metadata: dict optional, default None
+            If not None, a mapping of string keys to string values.
+            Some filesystems support storing metadata along the file
+            (such as "Content-Type").
+            Unsupported metadata keys will be ignored.
 
         Returns
         -------
@@ -694,9 +711,14 @@ cdef class FileSystem(_Weakrefable):
             c_string pathstr = _path_as_bytes(path)
             NativeFile stream = NativeFile()
             shared_ptr[COutputStream] out_handle
+            shared_ptr[const CKeyValueMetadata] c_metadata
+
+        if metadata is not None:
+            c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(metadata))
 
         with nogil:
-            out_handle = GetResultValue(self.fs.OpenAppendStream(pathstr))
+            out_handle = GetResultValue(
+                self.fs.OpenAppendStream(pathstr, c_metadata))
 
         stream.set_output_stream(out_handle)
         stream.is_writable = True
@@ -970,13 +992,13 @@ class FileSystemHandler(ABC):
         """
 
     @abstractmethod
-    def open_output_stream(self, path):
+    def open_output_stream(self, path, metadata):
         """
         Implement PyFileSystem.open_output_stream(...).
         """
 
     @abstractmethod
-    def open_append_stream(self, path):
+    def open_append_stream(self, path, metadata):
         """
         Implement PyFileSystem.open_append_stream(...).
         """
@@ -1067,17 +1089,23 @@ cdef void _cb_open_input_file(handler, const c_string& path,
                         "a PyArrow file")
     out[0] = (<NativeFile> stream).get_random_access_file()
 
-cdef void _cb_open_output_stream(handler, const c_string& path,
-                                 shared_ptr[COutputStream]* out) except *:
-    stream = handler.open_output_stream(frombytes(path))
+cdef void _cb_open_output_stream(
+        handler, const c_string& path,
+        const shared_ptr[const CKeyValueMetadata]& metadata,
+        shared_ptr[COutputStream]* out) except *:
+    stream = handler.open_output_stream(
+        frombytes(path), pyarrow_wrap_metadata(metadata))
     if not isinstance(stream, NativeFile):
         raise TypeError("open_output_stream should have returned "
                         "a PyArrow file")
     out[0] = (<NativeFile> stream).get_output_stream()
 
-cdef void _cb_open_append_stream(handler, const c_string& path,
-                                 shared_ptr[COutputStream]* out) except *:
-    stream = handler.open_append_stream(frombytes(path))
+cdef void _cb_open_append_stream(
+        handler, const c_string& path,
+        const shared_ptr[const CKeyValueMetadata]& metadata,
+        shared_ptr[COutputStream]* out) except *:
+    stream = handler.open_append_stream(
+        frombytes(path), pyarrow_wrap_metadata(metadata))
     if not isinstance(stream, NativeFile):
         raise TypeError("open_append_stream should have returned "
                         "a PyArrow file")
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index 7e63c01..fe50553 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -299,6 +299,8 @@ class FSSpecHandler(FileSystemHandler):
         # instead of a file
         self.fs.copy(src, dest)
 
+    # TODO can we read/pass metadata (e.g. Content-Type) in the methods below?
+
     def open_input_stream(self, path):
         from pyarrow import PythonFile
 
@@ -315,12 +317,12 @@ class FSSpecHandler(FileSystemHandler):
 
         return PythonFile(self.fs.open(path, mode="rb"), mode="r")
 
-    def open_output_stream(self, path):
+    def open_output_stream(self, path, metadata):
         from pyarrow import PythonFile
 
         return PythonFile(self.fs.open(path, mode="wb"), mode="w")
 
-    def open_append_stream(self, path):
+    def open_append_stream(self, path, metadata):
         from pyarrow import PythonFile
 
         return PythonFile(self.fs.open(path, mode="ab"), mode="w")
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 3912dac..5afa806 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1196,7 +1196,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
 
     cdef cppclass CInputStream" arrow::io::InputStream"(FileInterface,
                                                         Readable):
-        pass
+        CResult[shared_ptr[const CKeyValueMetadata]] ReadMetadata()
 
     cdef cppclass CRandomAccessFile" arrow::io::RandomAccessFile"(CInputStream,
                                                                   Seekable):
diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd
index ee1b8a7..baa5eca 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -73,9 +73,9 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
         CResult[shared_ptr[CRandomAccessFile]] OpenInputFile(
             const c_string& path)
         CResult[shared_ptr[COutputStream]] OpenOutputStream(
-            const c_string& path)
+            const c_string& path, const shared_ptr[const CKeyValueMetadata]&)
         CResult[shared_ptr[COutputStream]] OpenAppendStream(
-            const c_string& path)
+            const c_string& path, const shared_ptr[const CKeyValueMetadata]&)
         c_bool Equals(const CFileSystem& other)
         c_bool Equals(shared_ptr[CFileSystem] other)
 
@@ -234,8 +234,9 @@ ctypedef void CallbackOpenInputStream(object, const c_string&,
                                       shared_ptr[CInputStream]*)
 ctypedef void CallbackOpenInputFile(object, const c_string&,
                                     shared_ptr[CRandomAccessFile]*)
-ctypedef void CallbackOpenOutputStream(object, const c_string&,
-                                       shared_ptr[COutputStream]*)
+ctypedef void CallbackOpenOutputStream(
+    object, const c_string&, const shared_ptr[const CKeyValueMetadata]&,
+    shared_ptr[COutputStream]*)
 ctypedef void CallbackNormalizePath(object, const c_string&, c_string*)
 
 cdef extern from "arrow/python/filesystem.h" namespace "arrow::py::fs" nogil:
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 63ce586..4945664 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -230,6 +230,24 @@ cdef class NativeFile(_Weakrefable):
 
         return size
 
+    def metadata(self):
+        """
+        Return file metadata
+        """
+        cdef:
+            shared_ptr[const CKeyValueMetadata] c_metadata
+
+        handle = self.get_input_stream()
+        with nogil:
+            c_metadata = GetResultValue(handle.get().ReadMetadata())
+
+        metadata = {}
+        if c_metadata.get() != nullptr:
+            for i in range(c_metadata.get().size()):
+                metadata[frombytes(c_metadata.get().key(i))] = \
+                    c_metadata.get().value(i)
+        return metadata
+
     def tell(self):
         """
         Return current stream position
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 8880179..1959519 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -528,8 +528,8 @@ cpdef DataType ensure_type(object type, bint allow_none=*)
 
 # Exceptions may be raised when converting dict values, so need to
 # check exception state on return
-cdef shared_ptr[CKeyValueMetadata] pyarrow_unwrap_metadata(object meta) \
-    except *
+cdef shared_ptr[const CKeyValueMetadata] pyarrow_unwrap_metadata(
+    object meta) except *
 cdef object pyarrow_wrap_metadata(
     const shared_ptr[const CKeyValueMetadata]& meta)
 
diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py
index acdff25..ae687a3 100644
--- a/python/pyarrow/tests/test_fs.py
+++ b/python/pyarrow/tests/test_fs.py
@@ -128,12 +128,12 @@ class DummyHandler(FileSystemHandler):
         data = "{0}:input_file".format(path).encode('utf8')
         return pa.BufferReader(data)
 
-    def open_output_stream(self, path):
+    def open_output_stream(self, path, metadata):
         if "notfound" in path:
             raise FileNotFoundError(path)
         return pa.BufferOutputStream()
 
-    def open_append_stream(self, path):
+    def open_append_stream(self, path, metadata):
         if "notfound" in path:
             raise FileNotFoundError(path)
         return pa.BufferOutputStream()
@@ -193,11 +193,11 @@ class ProxyHandler(FileSystemHandler):
     def open_input_file(self, path):
         return self._fs.open_input_file(path)
 
-    def open_output_stream(self, path):
-        return self._fs.open_output_stream(path)
+    def open_output_stream(self, path, metadata):
+        return self._fs.open_output_stream(path, metadata=metadata)
 
-    def open_append_stream(self, path):
-        return self._fs.open_append_stream(path)
+    def open_append_stream(self, path, metadata):
+        return self._fs.open_append_stream(path, metadata=metadata)
 
 
 @pytest.fixture
@@ -967,6 +967,25 @@ def test_open_append_stream(fs, pathfn, compression, buffer_size, compressor,
                                   buffer_size=buffer_size)
 
 
+def test_open_output_stream_metadata(fs, pathfn):
+    p = pathfn('open-output-stream-metadata')
+    metadata = {'Content-Type': 'x-pyarrow/test'}
+
+    data = b'some data'
+    with fs.open_output_stream(p, metadata=metadata) as f:
+        f.write(data)
+
+    with fs.open_input_stream(p) as f:
+        assert f.read() == data
+        got_metadata = f.metadata()
+
+    if fs.type_name == 's3' or 'mock' in fs.type_name:
+        for k, v in metadata.items():
+            assert got_metadata[k] == v.encode()
+    else:
+        assert got_metadata == {}
+
+
 def test_localfs_options():
     # LocalFileSystem instantiation
     LocalFileSystem(use_mmap=False)
@@ -1493,6 +1512,13 @@ def test_s3_real_aws():
     fs = S3FileSystem(anonymous=True, region='us-east-2')
     entries = fs.get_file_info(FileSelector('ursa-labs-taxi-data'))
     assert len(entries) > 0
+    with fs.open_input_stream('ursa-labs-taxi-data/2019/06/data.parquet') as f:
+        md = f.metadata()
+        assert 'Content-Type' in md
+        assert md['Last-Modified'] == b'2020-01-17T16:26:28Z'
+        # For some reason, the header value is quoted
+        # (both with AWS and Minio)
+        assert md['ETag'] == b'"f1efd5d76cb82861e1542117bfa52b90-8"'
 
 
 @pytest.mark.s3