You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2022/11/30 22:02:14 UTC

[arrow] branch feature/format-string-view updated: Adding IPC serde of views by converting to/from dense strings

This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch feature/format-string-view
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/feature/format-string-view by this push:
     new 7fb704c704 Adding IPC serde of views by converting to/from dense strings
7fb704c704 is described below

commit 7fb704c7049dd191e69689082ac60f0abcca67f9
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Wed Nov 30 17:01:23 2022 -0500

    Adding IPC serde of views by converting to/from dense strings
---
 cpp/src/arrow/buffer_builder.h                     |  6 ++
 .../arrow/compute/kernels/scalar_cast_string.cc    | 68 ++++++++++++++-----
 cpp/src/arrow/ipc/metadata_internal.cc             | 10 ++-
 cpp/src/arrow/ipc/metadata_internal.h              |  2 +
 cpp/src/arrow/ipc/read_write_test.cc               | 79 +++++++++++-----------
 cpp/src/arrow/ipc/reader.cc                        | 56 +++++++++++++--
 cpp/src/arrow/ipc/test_common.cc                   | 48 ++++++-------
 cpp/src/arrow/ipc/writer.cc                        | 53 +++++++++++++--
 cpp/src/arrow/util/string_header.h                 |  5 +-
 9 files changed, 230 insertions(+), 97 deletions(-)

diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h
index 5f37e55200..ab397485d6 100644
--- a/cpp/src/arrow/buffer_builder.h
+++ b/cpp/src/arrow/buffer_builder.h
@@ -117,6 +117,9 @@ class ARROW_EXPORT BufferBuilder {
     UnsafeAppend(data, length);
     return Status::OK();
   }
+  Status Append(std::string_view v) {
+    return Append(v.data(), static_cast<int64_t>(v.size()));
+  }
 
   /// \brief Append copies of a value to the buffer
   ///
@@ -138,6 +141,9 @@ class ARROW_EXPORT BufferBuilder {
     memcpy(data_ + size_, data, static_cast<size_t>(length));
     size_ += length;
   }
+  void UnsafeAppend(std::string_view v) {
+    UnsafeAppend(v.data(), static_cast<int64_t>(v.size()));
+  }
 
   void UnsafeAppend(const int64_t num_copies, uint8_t value) {
     memset(data_ + size_, value, static_cast<size_t>(num_copies));
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_string.cc b/cpp/src/arrow/compute/kernels/scalar_cast_string.cc
index 68ba6268ae..58d0629c86 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_string.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_string.cc
@@ -323,11 +323,11 @@ Status BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& batch,
   constexpr bool kOutputFixed = std::is_same_v<FixedSizeBinaryType, O>;
 
   if constexpr (kInputOffsets && kOutputOffsets) {
-    // FIXME(bkietz) this discards preallocated storage. It seems preferable to me to
-    // allocate a new null bitmap if necessary than to always allocate new offsets.
     // Start with a zero-copy cast, but change indices to expected size
     RETURN_NOT_OK(SimpleUtf8Validation());
     RETURN_NOT_OK(ZeroCopyCastExec(ctx, batch, out));
+    // FIXME(bkietz) this discards preallocated storage. It seems preferable to me to
+    // allocate a new null bitmap if necessary than to always allocate new offsets.
     return CastBinaryToBinaryOffsets<typename I::offset_type, typename O::offset_type>(
         ctx, input, out->array_data().get());
   }
@@ -337,54 +337,86 @@ Status BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& batch,
   }
 
   if constexpr (kInputViews && kOutputOffsets) {
-    // FIXME(bkietz) this discards preallocated offset storage
-    typename TypeTraits<O>::BuilderType builder{ctx->memory_pool()};
+    // copy the input's null bitmap if necessary
+    if (input.MayHaveNulls()) {
+      ARROW_ASSIGN_OR_RAISE(
+          output->buffers[0],
+          arrow::internal::CopyBitmap(ctx->memory_pool(), input.buffers[0].data,
+                                      input.offset, input.length));
+    } else {
+      output->buffers[0] = nullptr;
+    }
+
+    using offset_type = typename O::offset_type;
+
+    auto* offsets = output->buffers[1]->mutable_data_as<offset_type>();
+    offsets[0] = 0;
+    auto AppendOffset = [&](size_t size) mutable {
+      offsets[1] = offsets[0] + static_cast<offset_type>(size);
+      ++offsets;
+    };
 
-    RETURN_NOT_OK(builder.Reserve(input.length));
     // TODO(bkietz) if ArraySpan::buffers were a SmallVector, we could have access to all
     // the character data buffers here and reserve character data accordingly.
+    BufferBuilder char_builder{ctx->memory_pool()};
 
-    // sweep through L1-sized chunks to reduce the frequency of allocation
+    // sweep through L1-sized chunks to reduce the frequency of allocation and overflow
+    // checking
     int64_t chunk_size = ctx->exec_context()->cpu_info()->CacheSize(
                              ::arrow::internal::CpuInfo::CacheLevel::L1) /
                          sizeof(StringHeader) / 4;
 
     RETURN_NOT_OK(::arrow::internal::VisitSlices(
         input, chunk_size, [&](const ArraySpan& input_slice) {
-          int64_t num_chars = builder.value_data_length(), num_appended_chars = 0;
+          size_t num_appended_chars = 0;
+          int64_t num_chars = char_builder.length();
           VisitArraySpanInline<I>(
-              input_slice,
-              [&](std::string_view v) {
-                num_appended_chars += static_cast<int64_t>(v.size());
-              },
+              input_slice, [&](std::string_view v) { num_appended_chars += v.size(); },
               [] {});
 
-          RETURN_NOT_OK(builder.ReserveData(num_appended_chars));
+          if constexpr (std::is_same_v<offset_type, int32_t>) {
+            if (ARROW_PREDICT_FALSE(char_builder.length() + num_appended_chars >
+                                    std::numeric_limits<int32_t>::max())) {
+              return Status::Invalid("Failed casting from ", input.type->ToString(),
+                                     " to ", out->type()->ToString(),
+                                     ": input array viewed too many characters");
+            }
+          }
+          RETURN_NOT_OK(char_builder.Reserve(static_cast<int64_t>(num_appended_chars)));
 
           VisitArraySpanInline<I>(
-              input_slice, [&](std::string_view v) { builder.UnsafeAppend(v); },
-              [&] { builder.UnsafeAppendNull(); });
+              input_slice,
+              [&](std::string_view v) {
+                char_builder.UnsafeAppend(v);
+                AppendOffset(v.size());
+              },
+              [&] { AppendOffset(0); });
 
           if (check_utf8) {
-            if (ARROW_PREDICT_FALSE(!ValidateUTF8Inline(builder.value_data() + num_chars,
-                                                        num_appended_chars))) {
+            if (ARROW_PREDICT_FALSE(
+                    !ValidateUTF8Inline(char_builder.data() + num_chars,
+                                        static_cast<int64_t>(num_appended_chars)))) {
               return Status::Invalid("Invalid UTF8 sequence");
             }
           }
           return Status::OK();
         }));
 
-    return builder.FinishInternal(std::get_if<std::shared_ptr<ArrayData>>(&out->value));
+    return char_builder.Finish(&output->buffers[2]);
   }
 
   if constexpr ((kInputOffsets || kInputFixed) && kOutputViews) {
-    // we can reuse the data buffer here and just add views which reference it
+    // FIXME(bkietz) when outputting views, we *could* output into slices,
+    // provided we have a threadsafe place to stash accumulated buffers
+    // of character data.
+
     if (input.MayHaveNulls()) {
       ARROW_ASSIGN_OR_RAISE(
           output->buffers[0],
           arrow::internal::CopyBitmap(ctx->memory_pool(), input.buffers[0].data,
                                       input.offset, input.length));
     }
+
     // FIXME(bkietz) segfault due to null buffer owner
     // output->buffers[2] = input.GetBuffer(kInputFixed ? 1 : 2);
 
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc
index 367b31d5dd..224efb319d 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -525,11 +525,13 @@ class FieldToFlatbufferVisitor {
 
   Status Visit(const BinaryViewType& type) {
     // BinaryView will be written to IPC as a normal binary array
+    extra_type_metadata_[std::string{kSerializedStringViewKeyName}] = "";
     return Visit(BinaryType());
   }
 
   Status Visit(const StringViewType& type) {
     // StringView will be written to IPC as a normal UTF8 string array
+    extra_type_metadata_[std::string{kSerializedStringViewKeyName}] = "";
     return Visit(StringType());
   }
 
@@ -812,7 +814,7 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, FieldPosition field_pos,
     dictionary_id = encoding->id();
   }
 
-  // 4. Is it an extension type?
+  // 4. Is it an extension or view type?
   if (metadata != nullptr) {
     // Look for extension metadata in custom_metadata field
     int name_index = metadata->FindKey(kExtensionTypeKeyName);
@@ -833,6 +835,12 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, FieldPosition field_pos,
       }
       // NOTE: if extension type is unknown, we do not raise here and
       // simply return the storage type.
+    } else if (name_index = metadata->FindKey(std::string{kSerializedStringViewKeyName});
+               name_index != -1) {
+      DCHECK(type->id() == Type::STRING || type->id() == Type::BINARY);
+      RETURN_NOT_OK(metadata->Delete(name_index));
+      bool is_utf8 = type->id() == Type::STRING;
+      type = is_utf8 ? utf8_view() : binary_view();
     }
   }
 
diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h
index abbed5b2da..6f07a8aea4 100644
--- a/cpp/src/arrow/ipc/metadata_internal.h
+++ b/cpp/src/arrow/ipc/metadata_internal.h
@@ -245,6 +245,8 @@ flatbuf::TimeUnit ToFlatbufferUnit(TimeUnit::type unit);
 ARROW_EXPORT
 TimeUnit::type FromFlatbufferUnit(flatbuf::TimeUnit unit);
 
+constexpr std::string_view kSerializedStringViewKeyName = "ARROW:string_view";
+
 }  // namespace internal
 }  // namespace ipc
 }  // namespace arrow
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index b556c8ed34..0e0a77f457 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -238,52 +238,57 @@ class TestSchemaMetadata : public ::testing::Test {
   }
 };
 
-const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
-
 TEST_F(TestSchemaMetadata, PrimitiveFields) {
-  auto f0 = field("f0", std::make_shared<Int8Type>());
-  auto f1 = field("f1", std::make_shared<Int16Type>(), false);
-  auto f2 = field("f2", std::make_shared<Int32Type>());
-  auto f3 = field("f3", std::make_shared<Int64Type>());
-  auto f4 = field("f4", std::make_shared<UInt8Type>());
-  auto f5 = field("f5", std::make_shared<UInt16Type>());
-  auto f6 = field("f6", std::make_shared<UInt32Type>());
-  auto f7 = field("f7", std::make_shared<UInt64Type>());
-  auto f8 = field("f8", std::make_shared<FloatType>());
-  auto f9 = field("f9", std::make_shared<DoubleType>(), false);
-  auto f10 = field("f10", std::make_shared<BooleanType>());
-
-  Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
-  CheckSchemaRoundtrip(schema);
+  CheckSchemaRoundtrip(Schema({
+      field("f0", int8()),
+      field("f1", int16(), false),
+      field("f2", int32()),
+      field("f3", int64()),
+      field("f4", uint8()),
+      field("f5", uint16()),
+      field("f6", uint32()),
+      field("f7", uint64()),
+      field("f8", float32()),
+      field("f9", float64(), false),
+      field("f10", boolean()),
+  }));
+}
+
+TEST_F(TestSchemaMetadata, BinaryFields) {
+  CheckSchemaRoundtrip(Schema({
+      field("f0", utf8()),
+      field("f1", binary()),
+      field("f2", large_utf8()),
+      field("f3", large_binary()),
+      field("f4", utf8_view()),
+      field("f5", binary_view()),
+      field("f6", fixed_size_binary(3)),
+      field("f7", fixed_size_binary(33)),
+  }));
 }
 
 TEST_F(TestSchemaMetadata, NestedFields) {
-  auto type = list(int32());
-  auto f0 = field("f0", type);
-
-  std::shared_ptr<StructType> type2(
-      new StructType({field("k1", INT32), field("k2", INT32), field("k3", INT32)}));
-  auto f1 = field("f1", type2);
-
-  Schema schema({f0, f1});
-  CheckSchemaRoundtrip(schema);
+  CheckSchemaRoundtrip(Schema({
+      field("f0", list(int32())),
+      field("f1", struct_({
+                      field("k1", int32()),
+                      field("k2", int32()),
+                      field("k3", int32()),
+                  })),
+  }));
 }
 
 TEST_F(TestSchemaMetadata, DictionaryFields) {
   {
-    auto dict_type = dictionary(int8(), int32(), true /* ordered */);
-    auto f0 = field("f0", dict_type);
-    auto f1 = field("f1", list(dict_type));
-
-    Schema schema({f0, f1});
-    CheckSchemaRoundtrip(schema);
+    auto dict_type = dictionary(int8(), int32(), /*ordered=*/true);
+    CheckSchemaRoundtrip(Schema({
+        field("f0", dict_type),
+        field("f1", list(dict_type)),
+    }));
   }
   {
     auto dict_type = dictionary(int8(), list(int32()));
-    auto f0 = field("f0", dict_type);
-
-    Schema schema({f0});
-    CheckSchemaRoundtrip(schema);
+    CheckSchemaRoundtrip(Schema({field("f0", dict_type)}));
   }
 }
 
@@ -291,9 +296,7 @@ TEST_F(TestSchemaMetadata, NestedDictionaryFields) {
   {
     auto inner_dict_type = dictionary(int8(), int32(), /*ordered=*/true);
     auto dict_type = dictionary(int16(), list(inner_dict_type));
-
-    Schema schema({field("f0", dict_type)});
-    CheckSchemaRoundtrip(schema);
+    CheckSchemaRoundtrip(Schema({field("f0", dict_type)}));
   }
   {
     auto dict_type1 = dictionary(int8(), utf8(), /*ordered=*/true);
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 843d5917b3..52476c221c 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -58,6 +58,7 @@
 #include "arrow/util/thread_pool.h"
 #include "arrow/util/ubsan.h"
 #include "arrow/util/vector.h"
+#include "arrow/visit_data_inline.h"
 #include "arrow/visit_type_inline.h"
 
 #include "generated/File_generated.h"  // IWYU pragma: export
@@ -289,7 +290,6 @@ class ArrayLoader {
     return Status::OK();
   }
 
-  template <typename TYPE>
   Status LoadBinary(Type::type type_id) {
     out_->buffers.resize(3);
 
@@ -345,12 +345,13 @@ class ArrayLoader {
 
   template <typename T>
   enable_if_base_binary<T, Status> Visit(const T& type) {
-    return LoadBinary<T>(type.id());
+    return LoadBinary(type.id());
   }
 
   Status Visit(const BinaryViewType& type) {
-    DCHECK(false);
-    return Status::NotImplemented("Reading IPC format to binary view is not supported");
+    // View arrays are serialized as the corresponding dense array.
+    // We can't produce the view array yet; the buffers may still be compressed.
+    return LoadBinary(type.id() == Type::STRING_VIEW ? Type::STRING : Type::BINARY);
   }
 
   Status Visit(const FixedSizeBinaryType& type) {
@@ -514,6 +515,49 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op
       });
 }
 
+Status ConvertViewArrays(const IpcReadOptions& options, ArrayDataVector* fields) {
+  struct StringViewAccumulator {
+    using DataPtrVector = std::vector<ArrayData*>;
+
+    void AppendFrom(const ArrayDataVector& fields) {
+      for (const auto& field : fields) {
+        if (field->type->id() == Type::STRING_VIEW ||
+            field->type->id() == Type::BINARY_VIEW) {
+          view_arrays_.push_back(field.get());
+        }
+        AppendFrom(field->child_data);
+      }
+    }
+
+    DataPtrVector Get(const ArrayDataVector& fields) && {
+      AppendFrom(fields);
+      return std::move(view_arrays_);
+    }
+
+    DataPtrVector view_arrays_;
+  };
+
+  auto view_arrays = StringViewAccumulator{}.Get(*fields);
+
+  return ::arrow::internal::OptionalParallelFor(
+      options.use_threads, static_cast<int>(view_arrays.size()), [&](int i) {
+        ArrayData* data = view_arrays[i];
+
+        // the only thing we need to fix here is replacing offsets with headers
+        ARROW_ASSIGN_OR_RAISE(
+            auto header_buffer,
+            AllocateBuffer(data->length * sizeof(StringHeader), options.memory_pool));
+
+        auto* headers = header_buffer->mutable_data_as<StringHeader>();
+        VisitArraySpanInline<BinaryType>(
+            *data, [&](std::string_view v) { *headers++ = StringHeader{v}; },
+            [&] { *headers++ = StringHeader{}; });
+
+        data->buffers[1] = std::move(header_buffer);
+        return Status::OK();
+      });
+}
+
 Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
     const flatbuf::RecordBatch* metadata, const std::shared_ptr<Schema>& schema,
     const std::vector<bool>* inclusion_mask, const IpcReadContext& context,
@@ -562,6 +606,7 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
     RETURN_NOT_OK(
         DecompressBuffers(context.compression, context.options, &filtered_columns));
   }
+  RETURN_NOT_OK(ConvertViewArrays(context.options, &filtered_columns));
 
   // swap endian in a set of ArrayData if necessary (swap_endian == true)
   if (context.swap_endian) {
@@ -811,10 +856,11 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
   const Field dummy_field("", value_type);
   RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get()));
 
+  ArrayDataVector dict_fields{dict_data};
   if (compression != Compression::UNCOMPRESSED) {
-    ArrayDataVector dict_fields{dict_data};
     RETURN_NOT_OK(DecompressBuffers(compression, context.options, &dict_fields));
   }
+  RETURN_NOT_OK(ConvertViewArrays(context.options, &dict_fields));
 
   // swap endian in dict_data if necessary (swap_endian == true)
   if (context.swap_endian) {
diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
index ed70de6c6a..370e72b4cc 100644
--- a/cpp/src/arrow/ipc/test_common.cc
+++ b/cpp/src/arrow/ipc/test_common.cc
@@ -351,37 +351,27 @@ static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls
 
 Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_nulls) {
   const int64_t length = 500;
-  auto f0 = field("strings", utf8());
-  auto f1 = field("binaries", binary());
-  auto f2 = field("large_strings", large_utf8());
-  auto f3 = field("large_binaries", large_binary());
-  auto schema = ::arrow::schema({f0, f1, f2, f3});
-
-  std::shared_ptr<Array> a0, a1, a2, a3;
-  MemoryPool* pool = default_memory_pool();
 
-  // Quirk with RETURN_NOT_OK macro and templated functions
-  {
-    auto s =
-        MakeBinaryArrayWithUniqueValues<StringBuilder>(length, with_nulls, pool, &a0);
-    RETURN_NOT_OK(s);
+  ArrayVector arrays;
+  FieldVector fields;
+
+  using namespace std::string_literals;
+  for (auto MakeArray : {
+           &MakeBinaryArrayWithUniqueValues<StringBuilder>,
+           &MakeBinaryArrayWithUniqueValues<BinaryBuilder>,
+           &MakeBinaryArrayWithUniqueValues<LargeStringBuilder>,
+           &MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>,
+           &MakeBinaryArrayWithUniqueValues<StringViewBuilder>,
+           &MakeBinaryArrayWithUniqueValues<BinaryViewBuilder>,
+       }) {
+    arrays.emplace_back();
+    RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back()));
+
+    const auto& type = arrays.back()->type();
+    fields.push_back(field(type->ToString(), type));
   }
-  {
-    auto s =
-        MakeBinaryArrayWithUniqueValues<BinaryBuilder>(length, with_nulls, pool, &a1);
-    RETURN_NOT_OK(s);
-  }
-  {
-    auto s = MakeBinaryArrayWithUniqueValues<LargeStringBuilder>(length, with_nulls, pool,
-                                                                 &a2);
-    RETURN_NOT_OK(s);
-  }
-  {
-    auto s = MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>(length, with_nulls, pool,
-                                                                 &a3);
-    RETURN_NOT_OK(s);
-  }
-  *out = RecordBatch::Make(schema, length, {a0, a1, a2, a3});
+
+  *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays));
   return Status::OK();
 }
 
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index d68da651f3..c658d114da 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -51,10 +51,12 @@
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/compression.h"
 #include "arrow/util/endian.h"
+#include "arrow/util/int_util_overflow.h"
 #include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/parallel.h"
 #include "arrow/visit_array_inline.h"
+#include "arrow/visit_data_inline.h"
 #include "arrow/visit_type_inline.h"
 
 namespace arrow {
@@ -361,6 +363,53 @@ class RecordBatchSerializer {
     return Status::OK();
   }
 
+  Status Visit(const BinaryViewArray& array) {
+    // a separate helper doesn't make sense here since we've already done the work
+    // to copy the bitmap
+    out_->body_buffers.emplace_back();
+    ARROW_ASSIGN_OR_RAISE(
+        out_->body_buffers.back(),
+        AllocateBuffer(sizeof(int32_t) * (array.length() + 1), options_.memory_pool));
+
+    auto* offsets = out_->body_buffers.back()->mutable_data_as<int32_t>();
+    offsets[0] = 0;
+    auto AppendOffset = [&](size_t size) mutable {
+      // ignore overflow for now
+      offsets[1] = arrow::internal::SafeSignedAdd(offsets[0], static_cast<int32_t>(size));
+      ++offsets;
+    };
+
+    int64_t size = 0;
+    VisitArraySpanInline<BinaryViewType>(
+        *array.data(),
+        [&](std::string_view v) {
+          size += static_cast<int64_t>(v.size());
+          AppendOffset(v.size());
+        },
+        [&] { AppendOffset(0); });
+
+    if (size > std::numeric_limits<int32_t>::max()) {
+      return Status::Invalid(
+          "Input view array viewed more characters than are representable with 32 bit "
+          "offsets, unable to serialize");
+    }
+
+    out_->body_buffers.emplace_back();
+    ARROW_ASSIGN_OR_RAISE(out_->body_buffers.back(),
+                          AllocateBuffer(size, options_.memory_pool));
+
+    VisitArraySpanInline<BinaryViewType>(
+        *array.data(),
+        [chars = out_->body_buffers.back()->mutable_data_as<char>()](
+            std::string_view v) mutable {
+          v.copy(chars, v.size());
+          chars += v.size();
+        },
+        [] {});
+
+    return Status::OK();
+  }
+
   template <typename T>
   enable_if_base_list<typename T::TypeClass, Status> Visit(const T& array) {
     using offset_type = typename T::offset_type;
@@ -388,10 +437,6 @@ class RecordBatchSerializer {
     return Status::OK();
   }
 
-  Status Visit(const BinaryViewArray& array) {
-    return Status::NotImplemented("Binary / string view type");
-  }
-
   Status Visit(const FixedSizeListArray& array) {
     --max_recursion_depth_;
     auto size = array.list_type()->list_size();
diff --git a/cpp/src/arrow/util/string_header.h b/cpp/src/arrow/util/string_header.h
index e3e9d9d69c..182b749f21 100644
--- a/cpp/src/arrow/util/string_header.h
+++ b/cpp/src/arrow/util/string_header.h
@@ -158,8 +158,9 @@ struct StringHeader {
       return memcmp(prefix_.data(), other.prefix_.data(), kPrefixSize);
     }
     int32_t size = std::min(size_, other.size_) - kPrefixSize;
-    if (size <= 0) {
-      // One ends within the prefix.
+    assert(size >= 0);
+    if (size == 0) {
+      // One string is just the prefix.
       return size_ - other.size_;
     }
     if (static_cast<uint32_t>(size) <= kInlineSize && IsInline() && other.IsInline()) {