You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/03/20 09:48:41 UTC

[2/3] arrow git commit: ARROW-661: [C++] Add LargeRecordBatch metadata type, IPC support, associated refactoring

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
deleted file mode 100644
index be0d282..0000000
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ /dev/null
@@ -1,597 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/ipc/metadata-internal.h"
-
-#include <cstdint>
-#include <cstring>
-#include <memory>
-#include <sstream>
-#include <string>
-
-#include "flatbuffers/flatbuffers.h"
-
-#include "arrow/array.h"
-#include "arrow/buffer.h"
-#include "arrow/ipc/Message_generated.h"
-#include "arrow/schema.h"
-#include "arrow/status.h"
-#include "arrow/type.h"
-
-namespace arrow {
-
-namespace flatbuf = org::apache::arrow::flatbuf;
-
-namespace ipc {
-
-static Status IntFromFlatbuffer(
-    const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
-  if (int_data->bitWidth() > 64) {
-    return Status::NotImplemented("Integers with more than 64 bits not implemented");
-  }
-  if (int_data->bitWidth() < 8) {
-    return Status::NotImplemented("Integers with less than 8 bits not implemented");
-  }
-
-  switch (int_data->bitWidth()) {
-    case 8:
-      *out = int_data->is_signed() ? int8() : uint8();
-      break;
-    case 16:
-      *out = int_data->is_signed() ? int16() : uint16();
-      break;
-    case 32:
-      *out = int_data->is_signed() ? int32() : uint32();
-      break;
-    case 64:
-      *out = int_data->is_signed() ? int64() : uint64();
-      break;
-    default:
-      return Status::NotImplemented("Integers not in cstdint are not implemented");
-  }
-  return Status::OK();
-}
-
-static Status FloatFromFlatuffer(
-    const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) {
-  if (float_data->precision() == flatbuf::Precision_HALF) {
-    *out = float16();
-  } else if (float_data->precision() == flatbuf::Precision_SINGLE) {
-    *out = float32();
-  } else {
-    *out = float64();
-  }
-  return Status::OK();
-}
-
-// Forward declaration
-static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
-    DictionaryMemo* dictionary_memo, FieldOffset* offset);
-
-static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) {
-  return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union();
-}
-
-static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) {
-  return flatbuf::CreateFloatingPoint(fbb, precision).Union();
-}
-
-static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type,
-    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) {
-  FieldOffset field;
-  for (int i = 0; i < type->num_children(); ++i) {
-    RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field));
-    out_children->push_back(field);
-  }
-  return Status::OK();
-}
-
-static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
-    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
-    Offset* offset) {
-  RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
-  *offset = flatbuf::CreateList(fbb).Union();
-  return Status::OK();
-}
-
-static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
-    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
-    Offset* offset) {
-  RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
-  *offset = flatbuf::CreateStruct_(fbb).Union();
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Union implementation
-
-static Status UnionFromFlatbuffer(const flatbuf::Union* union_data,
-    const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
-  UnionMode mode = union_data->mode() == flatbuf::UnionMode_Sparse ? UnionMode::SPARSE
-                                                                   : UnionMode::DENSE;
-
-  std::vector<uint8_t> type_codes;
-
-  const flatbuffers::Vector<int32_t>* fb_type_ids = union_data->typeIds();
-  if (fb_type_ids == nullptr) {
-    for (uint8_t i = 0; i < children.size(); ++i) {
-      type_codes.push_back(i);
-    }
-  } else {
-    for (int32_t id : (*fb_type_ids)) {
-      // TODO(wesm): can these values exceed 255?
-      type_codes.push_back(static_cast<uint8_t>(id));
-    }
-  }
-
-  *out = union_(children, type_codes, mode);
-  return Status::OK();
-}
-
-static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
-    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
-    Offset* offset) {
-  RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
-
-  const auto& union_type = static_cast<const UnionType&>(*type);
-
-  flatbuf::UnionMode mode = union_type.mode == UnionMode::SPARSE
-                                ? flatbuf::UnionMode_Sparse
-                                : flatbuf::UnionMode_Dense;
-
-  std::vector<int32_t> type_ids;
-  type_ids.reserve(union_type.type_codes.size());
-  for (uint8_t code : union_type.type_codes) {
-    type_ids.push_back(code);
-  }
-
-  auto fb_type_ids = fbb.CreateVector(type_ids);
-
-  *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union();
-  return Status::OK();
-}
-
-#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED)            \
-  *out_type = flatbuf::Type_Int;                        \
-  *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \
-  break;
-
-static inline flatbuf::TimeUnit ToFlatbufferUnit(TimeUnit unit) {
-  switch (unit) {
-    case TimeUnit::SECOND:
-      return flatbuf::TimeUnit_SECOND;
-    case TimeUnit::MILLI:
-      return flatbuf::TimeUnit_MILLISECOND;
-    case TimeUnit::MICRO:
-      return flatbuf::TimeUnit_MICROSECOND;
-    case TimeUnit::NANO:
-      return flatbuf::TimeUnit_NANOSECOND;
-    default:
-      break;
-  }
-  return flatbuf::TimeUnit_MIN;
-}
-
-static inline TimeUnit FromFlatbufferUnit(flatbuf::TimeUnit unit) {
-  switch (unit) {
-    case flatbuf::TimeUnit_SECOND:
-      return TimeUnit::SECOND;
-    case flatbuf::TimeUnit_MILLISECOND:
-      return TimeUnit::MILLI;
-    case flatbuf::TimeUnit_MICROSECOND:
-      return TimeUnit::MICRO;
-    case flatbuf::TimeUnit_NANOSECOND:
-      return TimeUnit::NANO;
-    default:
-      break;
-  }
-  // cannot reach
-  return TimeUnit::SECOND;
-}
-
-static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
-    const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
-  switch (type) {
-    case flatbuf::Type_NONE:
-      return Status::Invalid("Type metadata cannot be none");
-    case flatbuf::Type_Int:
-      return IntFromFlatbuffer(static_cast<const flatbuf::Int*>(type_data), out);
-    case flatbuf::Type_FloatingPoint:
-      return FloatFromFlatuffer(
-          static_cast<const flatbuf::FloatingPoint*>(type_data), out);
-    case flatbuf::Type_Binary:
-      *out = binary();
-      return Status::OK();
-    case flatbuf::Type_FixedWidthBinary: {
-      auto fw_binary = static_cast<const flatbuf::FixedWidthBinary*>(type_data);
-      *out = fixed_width_binary(fw_binary->byteWidth());
-      return Status::OK();
-    }
-    case flatbuf::Type_Utf8:
-      *out = utf8();
-      return Status::OK();
-    case flatbuf::Type_Bool:
-      *out = boolean();
-      return Status::OK();
-    case flatbuf::Type_Decimal:
-      return Status::NotImplemented("Decimal");
-    case flatbuf::Type_Date:
-      *out = date();
-      return Status::OK();
-    case flatbuf::Type_Time: {
-      auto time_type = static_cast<const flatbuf::Time*>(type_data);
-      *out = time(FromFlatbufferUnit(time_type->unit()));
-      return Status::OK();
-    }
-    case flatbuf::Type_Timestamp: {
-      auto ts_type = static_cast<const flatbuf::Timestamp*>(type_data);
-      *out = timestamp(FromFlatbufferUnit(ts_type->unit()));
-      return Status::OK();
-    }
-    case flatbuf::Type_Interval:
-      return Status::NotImplemented("Interval");
-    case flatbuf::Type_List:
-      if (children.size() != 1) {
-        return Status::Invalid("List must have exactly 1 child field");
-      }
-      *out = std::make_shared<ListType>(children[0]);
-      return Status::OK();
-    case flatbuf::Type_Struct_:
-      *out = std::make_shared<StructType>(children);
-      return Status::OK();
-    case flatbuf::Type_Union:
-      return UnionFromFlatbuffer(
-          static_cast<const flatbuf::Union*>(type_data), children, out);
-    default:
-      return Status::Invalid("Unrecognized type");
-  }
-}
-
-// TODO(wesm): Convert this to visitor pattern
-static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
-    std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
-    flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) {
-  if (type->type == Type::DICTIONARY) {
-    // In this library, the dictionary "type" is a logical construct. Here we
-    // pass through to the value type, as we've already captured the index
-    // type in the DictionaryEncoding metadata in the parent field
-    const auto& dict_type = static_cast<const DictionaryType&>(*type);
-    return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout,
-        out_type, dictionary_memo, offset);
-  }
-
-  std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
-  for (const BufferDescr& descr : buffer_layout) {
-    flatbuf::VectorType vector_type;
-    switch (descr.type()) {
-      case BufferType::OFFSET:
-        vector_type = flatbuf::VectorType_OFFSET;
-        break;
-      case BufferType::DATA:
-        vector_type = flatbuf::VectorType_DATA;
-        break;
-      case BufferType::VALIDITY:
-        vector_type = flatbuf::VectorType_VALIDITY;
-        break;
-      case BufferType::TYPE:
-        vector_type = flatbuf::VectorType_TYPE;
-        break;
-      default:
-        vector_type = flatbuf::VectorType_DATA;
-        break;
-    }
-    auto offset = flatbuf::CreateVectorLayout(
-        fbb, static_cast<int16_t>(descr.bit_width()), vector_type);
-    layout->push_back(offset);
-  }
-
-  switch (type->type) {
-    case Type::BOOL:
-      *out_type = flatbuf::Type_Bool;
-      *offset = flatbuf::CreateBool(fbb).Union();
-      break;
-    case Type::UINT8:
-      INT_TO_FB_CASE(8, false);
-    case Type::INT8:
-      INT_TO_FB_CASE(8, true);
-    case Type::UINT16:
-      INT_TO_FB_CASE(16, false);
-    case Type::INT16:
-      INT_TO_FB_CASE(16, true);
-    case Type::UINT32:
-      INT_TO_FB_CASE(32, false);
-    case Type::INT32:
-      INT_TO_FB_CASE(32, true);
-    case Type::UINT64:
-      INT_TO_FB_CASE(64, false);
-    case Type::INT64:
-      INT_TO_FB_CASE(64, true);
-    case Type::FLOAT:
-      *out_type = flatbuf::Type_FloatingPoint;
-      *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_SINGLE);
-      break;
-    case Type::DOUBLE:
-      *out_type = flatbuf::Type_FloatingPoint;
-      *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE);
-      break;
-    case Type::FIXED_WIDTH_BINARY: {
-      const auto& fw_type = static_cast<const FixedWidthBinaryType&>(*type);
-      *out_type = flatbuf::Type_FixedWidthBinary;
-      *offset = flatbuf::CreateFixedWidthBinary(fbb, fw_type.byte_width()).Union();
-    } break;
-    case Type::BINARY:
-      *out_type = flatbuf::Type_Binary;
-      *offset = flatbuf::CreateBinary(fbb).Union();
-      break;
-    case Type::STRING:
-      *out_type = flatbuf::Type_Utf8;
-      *offset = flatbuf::CreateUtf8(fbb).Union();
-      break;
-    case Type::DATE:
-      *out_type = flatbuf::Type_Date;
-      *offset = flatbuf::CreateDate(fbb).Union();
-      break;
-    case Type::TIME: {
-      const auto& time_type = static_cast<const TimeType&>(*type);
-      *out_type = flatbuf::Type_Time;
-      *offset = flatbuf::CreateTime(fbb, ToFlatbufferUnit(time_type.unit)).Union();
-    } break;
-    case Type::TIMESTAMP: {
-      const auto& ts_type = static_cast<const TimestampType&>(*type);
-      *out_type = flatbuf::Type_Timestamp;
-      *offset = flatbuf::CreateTimestamp(fbb, ToFlatbufferUnit(ts_type.unit)).Union();
-    } break;
-    case Type::LIST:
-      *out_type = flatbuf::Type_List;
-      return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset);
-    case Type::STRUCT:
-      *out_type = flatbuf::Type_Struct_;
-      return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset);
-    case Type::UNION:
-      *out_type = flatbuf::Type_Union;
-      return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset);
-    default:
-      *out_type = flatbuf::Type_NONE;  // Make clang-tidy happy
-      std::stringstream ss;
-      ss << "Unable to convert type: " << type->ToString() << std::endl;
-      return Status::NotImplemented(ss.str());
-  }
-  return Status::OK();
-}
-
-using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>;
-
-static DictionaryOffset GetDictionaryEncoding(
-    FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) {
-  int64_t dictionary_id = memo->GetId(type.dictionary());
-
-  // We assume that the dictionary index type (as an integer) has already been
-  // validated elsewhere, and can safely assume we are dealing with signed
-  // integers
-  const auto& fw_index_type = static_cast<const FixedWidthType&>(*type.index_type());
-
-  auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), true);
-
-  // TODO(wesm): ordered dictionaries
-  return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, index_type_offset);
-}
-
-static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
-    DictionaryMemo* dictionary_memo, FieldOffset* offset) {
-  auto fb_name = fbb.CreateString(field->name);
-
-  flatbuf::Type type_enum;
-  Offset type_offset;
-  Offset type_layout;
-  std::vector<FieldOffset> children;
-  std::vector<VectorLayoutOffset> layout;
-
-  RETURN_NOT_OK(TypeToFlatbuffer(
-      fbb, field->type, &children, &layout, &type_enum, dictionary_memo, &type_offset));
-  auto fb_children = fbb.CreateVector(children);
-  auto fb_layout = fbb.CreateVector(layout);
-
-  DictionaryOffset dictionary = 0;
-  if (field->type->type == Type::DICTIONARY) {
-    dictionary = GetDictionaryEncoding(
-        fbb, static_cast<const DictionaryType&>(*field->type), dictionary_memo);
-  }
-
-  // TODO: produce the list of VectorTypes
-  *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_offset,
-      dictionary, fb_children, fb_layout);
-
-  return Status::OK();
-}
-
-Status FieldFromFlatbufferDictionary(
-    const flatbuf::Field* field, std::shared_ptr<Field>* out) {
-  // Need an empty memo to pass down for constructing children
-  DictionaryMemo dummy_memo;
-
-  // Any DictionaryEncoding set is ignored here
-
-  std::shared_ptr<DataType> type;
-  auto children = field->children();
-  std::vector<std::shared_ptr<Field>> child_fields(children->size());
-  for (int i = 0; i < static_cast<int>(children->size()); ++i) {
-    RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i]));
-  }
-
-  RETURN_NOT_OK(
-      TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type));
-
-  *out = std::make_shared<Field>(field->name()->str(), type, field->nullable());
-  return Status::OK();
-}
-
-Status FieldFromFlatbuffer(const flatbuf::Field* field,
-    const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) {
-  std::shared_ptr<DataType> type;
-
-  const flatbuf::DictionaryEncoding* encoding = field->dictionary();
-
-  if (encoding == nullptr) {
-    // The field is not dictionary encoded. We must potentially visit its
-    // children to fully reconstruct the data type
-    auto children = field->children();
-    std::vector<std::shared_ptr<Field>> child_fields(children->size());
-    for (int i = 0; i < static_cast<int>(children->size()); ++i) {
-      RETURN_NOT_OK(
-          FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i]));
-    }
-    RETURN_NOT_OK(
-        TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type));
-  } else {
-    // The field is dictionary encoded. The type of the dictionary values has
-    // been determined elsewhere, and is stored in the DictionaryMemo. Here we
-    // construct the logical DictionaryType object
-
-    std::shared_ptr<Array> dictionary;
-    RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary));
-
-    std::shared_ptr<DataType> index_type;
-    RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type));
-    type = std::make_shared<DictionaryType>(index_type, dictionary);
-  }
-  *out = std::make_shared<Field>(field->name()->str(), type, field->nullable());
-  return Status::OK();
-}
-
-// Implement MessageBuilder
-
-// will return the endianness of the system we are running on
-// based the NUMPY_API function. See NOTICE.txt
-flatbuf::Endianness endianness() {
-  union {
-    uint32_t i;
-    char c[4];
-  } bint = {0x01020304};
-
-  return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little;
-}
-
-Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo,
-    flatbuffers::Offset<flatbuf::Schema>* out) {
-  std::vector<FieldOffset> field_offsets;
-  for (int i = 0; i < schema.num_fields(); ++i) {
-    std::shared_ptr<Field> field = schema.field(i);
-    FieldOffset offset;
-    RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset));
-    field_offsets.push_back(offset);
-  }
-
-  *out = flatbuf::CreateSchema(fbb, endianness(), fbb.CreateVector(field_offsets));
-  return Status::OK();
-}
-
-class MessageBuilder {
- public:
-  Status SetSchema(const Schema& schema, DictionaryMemo* dictionary_memo) {
-    flatbuffers::Offset<flatbuf::Schema> fb_schema;
-    RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, dictionary_memo, &fb_schema));
-
-    header_type_ = flatbuf::MessageHeader_Schema;
-    header_ = fb_schema.Union();
-    body_length_ = 0;
-    return Status::OK();
-  }
-
-  Status SetRecordBatch(int32_t length, int64_t body_length,
-      const std::vector<flatbuf::FieldNode>& nodes,
-      const std::vector<flatbuf::Buffer>& buffers) {
-    header_type_ = flatbuf::MessageHeader_RecordBatch;
-    header_ = flatbuf::CreateRecordBatch(fbb_, length, fbb_.CreateVectorOfStructs(nodes),
-                  fbb_.CreateVectorOfStructs(buffers))
-                  .Union();
-    body_length_ = body_length;
-
-    return Status::OK();
-  }
-
-  Status SetDictionary(int64_t id, int32_t length, int64_t body_length,
-      const std::vector<flatbuf::FieldNode>& nodes,
-      const std::vector<flatbuf::Buffer>& buffers) {
-    header_type_ = flatbuf::MessageHeader_DictionaryBatch;
-
-    auto record_batch = flatbuf::CreateRecordBatch(fbb_, length,
-        fbb_.CreateVectorOfStructs(nodes), fbb_.CreateVectorOfStructs(buffers));
-
-    header_ = flatbuf::CreateDictionaryBatch(fbb_, id, record_batch).Union();
-    body_length_ = body_length;
-    return Status::OK();
-  }
-
-  Status Finish();
-
-  Status GetBuffer(std::shared_ptr<Buffer>* out);
-
- private:
-  flatbuf::MessageHeader header_type_;
-  flatbuffers::Offset<void> header_;
-  int64_t body_length_;
-  flatbuffers::FlatBufferBuilder fbb_;
-};
-
-Status WriteSchemaMessage(
-    const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) {
-  MessageBuilder message;
-  RETURN_NOT_OK(message.SetSchema(schema, dictionary_memo));
-  RETURN_NOT_OK(message.Finish());
-  return message.GetBuffer(out);
-}
-
-Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
-    const std::vector<flatbuf::FieldNode>& nodes,
-    const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
-  MessageBuilder builder;
-  RETURN_NOT_OK(builder.SetRecordBatch(length, body_length, nodes, buffers));
-  RETURN_NOT_OK(builder.Finish());
-  return builder.GetBuffer(out);
-}
-
-Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
-    const std::vector<flatbuf::FieldNode>& nodes,
-    const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
-  MessageBuilder builder;
-  RETURN_NOT_OK(builder.SetDictionary(id, length, body_length, nodes, buffers));
-  RETURN_NOT_OK(builder.Finish());
-  return builder.GetBuffer(out);
-}
-
-Status MessageBuilder::Finish() {
-  auto message =
-      flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, body_length_);
-  fbb_.Finish(message);
-  return Status::OK();
-}
-
-Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) {
-  int32_t size = fbb_.GetSize();
-
-  auto result = std::make_shared<PoolBuffer>();
-  RETURN_NOT_OK(result->Resize(size));
-
-  uint8_t* dst = result->mutable_data();
-  memcpy(dst, fbb_.GetBufferPointer(), size);
-
-  *out = result;
-  return Status::OK();
-}
-
-}  // namespace ipc
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
deleted file mode 100644
index 59afecb..0000000
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef ARROW_IPC_METADATA_INTERNAL_H
-#define ARROW_IPC_METADATA_INTERNAL_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "flatbuffers/flatbuffers.h"
-
-#include "arrow/ipc/File_generated.h"
-#include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/metadata.h"
-
-namespace arrow {
-
-namespace flatbuf = org::apache::arrow::flatbuf;
-
-class Buffer;
-struct Field;
-class Schema;
-class Status;
-
-namespace ipc {
-
-using FBB = flatbuffers::FlatBufferBuilder;
-using FieldOffset = flatbuffers::Offset<arrow::flatbuf::Field>;
-using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>;
-using Offset = flatbuffers::Offset<void>;
-
-static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2;
-
-// Construct a field with type for a dictionary-encoded field. None of its
-// children or children's descendents can be dictionary encoded
-Status FieldFromFlatbufferDictionary(
-    const flatbuf::Field* field, std::shared_ptr<Field>* out);
-
-// Construct a field for a non-dictionary-encoded field. Its children may be
-// dictionary encoded
-Status FieldFromFlatbuffer(const flatbuf::Field* field,
-    const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out);
-
-Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo,
-    flatbuffers::Offset<flatbuf::Schema>* out);
-
-// Serialize arrow::Schema as a Flatbuffer
-//
-// \param[in] schema a Schema instance
-// \param[inout] dictionary_memo class for tracking dictionaries and assigning
-// dictionary ids
-// \param[out] out the serialized arrow::Buffer
-// \return Status outcome
-Status WriteSchemaMessage(
-    const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out);
-
-Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
-    const std::vector<flatbuf::FieldNode>& nodes,
-    const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
-
-Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
-    const std::vector<flatbuf::FieldNode>& nodes,
-    const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
-
-}  // namespace ipc
-}  // namespace arrow
-
-#endif  // ARROW_IPC_METADATA_INTERNAL_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 71bc5c9..a418d48 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -24,14 +24,14 @@
 
 #include "flatbuffers/flatbuffers.h"
 
+#include "arrow/array.h"
+#include "arrow/buffer.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/ipc/File_generated.h"
 #include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/metadata-internal.h"
-
-#include "arrow/buffer.h"
 #include "arrow/schema.h"
 #include "arrow/status.h"
+#include "arrow/type.h"
 
 namespace arrow {
 
@@ -39,6 +39,643 @@ namespace flatbuf = org::apache::arrow::flatbuf;
 
 namespace ipc {
 
+using FBB = flatbuffers::FlatBufferBuilder;
+using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>;
+using FieldOffset = flatbuffers::Offset<flatbuf::Field>;
+using LargeRecordBatchOffset = flatbuffers::Offset<flatbuf::LargeRecordBatch>;
+using RecordBatchOffset = flatbuffers::Offset<flatbuf::RecordBatch>;
+using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>;
+using Offset = flatbuffers::Offset<void>;
+
+static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2;
+
+static Status IntFromFlatbuffer(
+    const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
+  if (int_data->bitWidth() > 64) {
+    return Status::NotImplemented("Integers with more than 64 bits not implemented");
+  }
+  if (int_data->bitWidth() < 8) {
+    return Status::NotImplemented("Integers with less than 8 bits not implemented");
+  }
+
+  switch (int_data->bitWidth()) {
+    case 8:
+      *out = int_data->is_signed() ? int8() : uint8();
+      break;
+    case 16:
+      *out = int_data->is_signed() ? int16() : uint16();
+      break;
+    case 32:
+      *out = int_data->is_signed() ? int32() : uint32();
+      break;
+    case 64:
+      *out = int_data->is_signed() ? int64() : uint64();
+      break;
+    default:
+      return Status::NotImplemented("Integers not in cstdint are not implemented");
+  }
+  return Status::OK();
+}
+
+static Status FloatFromFlatuffer(
+    const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) {
+  if (float_data->precision() == flatbuf::Precision_HALF) {
+    *out = float16();
+  } else if (float_data->precision() == flatbuf::Precision_SINGLE) {
+    *out = float32();
+  } else {
+    *out = float64();
+  }
+  return Status::OK();
+}
+
+// Forward declaration
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+    DictionaryMemo* dictionary_memo, FieldOffset* offset);
+
+static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) {
+  return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union();
+}
+
+static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) {
+  return flatbuf::CreateFloatingPoint(fbb, precision).Union();
+}
+
+static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type,
+    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) {
+  FieldOffset field;
+  for (int i = 0; i < type->num_children(); ++i) {
+    RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field));
+    out_children->push_back(field);
+  }
+  return Status::OK();
+}
+
+static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+    Offset* offset) {
+  RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
+  *offset = flatbuf::CreateList(fbb).Union();
+  return Status::OK();
+}
+
+static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+    Offset* offset) {
+  RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
+  *offset = flatbuf::CreateStruct_(fbb).Union();
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Union implementation
+
+static Status UnionFromFlatbuffer(const flatbuf::Union* union_data,
+    const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
+  UnionMode mode = union_data->mode() == flatbuf::UnionMode_Sparse ? UnionMode::SPARSE
+                                                                   : UnionMode::DENSE;
+
+  std::vector<uint8_t> type_codes;
+
+  const flatbuffers::Vector<int32_t>* fb_type_ids = union_data->typeIds();
+  if (fb_type_ids == nullptr) {
+    for (uint8_t i = 0; i < children.size(); ++i) {
+      type_codes.push_back(i);
+    }
+  } else {
+    for (int32_t id : (*fb_type_ids)) {
+      // TODO(wesm): can these values exceed 255?
+      type_codes.push_back(static_cast<uint8_t>(id));
+    }
+  }
+
+  *out = union_(children, type_codes, mode);
+  return Status::OK();
+}
+
+static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+    Offset* offset) {
+  RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
+
+  const auto& union_type = static_cast<const UnionType&>(*type);
+
+  flatbuf::UnionMode mode = union_type.mode == UnionMode::SPARSE
+                                ? flatbuf::UnionMode_Sparse
+                                : flatbuf::UnionMode_Dense;
+
+  std::vector<int32_t> type_ids;
+  type_ids.reserve(union_type.type_codes.size());
+  for (uint8_t code : union_type.type_codes) {
+    type_ids.push_back(code);
+  }
+
+  auto fb_type_ids = fbb.CreateVector(type_ids);
+
+  *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union();
+  return Status::OK();
+}
+
+#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED)            \
+  *out_type = flatbuf::Type_Int;                        \
+  *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \
+  break;
+
+static inline flatbuf::TimeUnit ToFlatbufferUnit(TimeUnit unit) {
+  switch (unit) {
+    case TimeUnit::SECOND:
+      return flatbuf::TimeUnit_SECOND;
+    case TimeUnit::MILLI:
+      return flatbuf::TimeUnit_MILLISECOND;
+    case TimeUnit::MICRO:
+      return flatbuf::TimeUnit_MICROSECOND;
+    case TimeUnit::NANO:
+      return flatbuf::TimeUnit_NANOSECOND;
+    default:
+      break;
+  }
+  return flatbuf::TimeUnit_MIN;
+}
+
+static inline TimeUnit FromFlatbufferUnit(flatbuf::TimeUnit unit) {
+  switch (unit) {
+    case flatbuf::TimeUnit_SECOND:
+      return TimeUnit::SECOND;
+    case flatbuf::TimeUnit_MILLISECOND:
+      return TimeUnit::MILLI;
+    case flatbuf::TimeUnit_MICROSECOND:
+      return TimeUnit::MICRO;
+    case flatbuf::TimeUnit_NANOSECOND:
+      return TimeUnit::NANO;
+    default:
+      break;
+  }
+  // cannot reach
+  return TimeUnit::SECOND;
+}
+
+static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
+    const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
+  switch (type) {
+    case flatbuf::Type_NONE:
+      return Status::Invalid("Type metadata cannot be none");
+    case flatbuf::Type_Int:
+      return IntFromFlatbuffer(static_cast<const flatbuf::Int*>(type_data), out);
+    case flatbuf::Type_FloatingPoint:
+      return FloatFromFlatuffer(
+          static_cast<const flatbuf::FloatingPoint*>(type_data), out);
+    case flatbuf::Type_Binary:
+      *out = binary();
+      return Status::OK();
+    case flatbuf::Type_FixedWidthBinary: {
+      auto fw_binary = static_cast<const flatbuf::FixedWidthBinary*>(type_data);
+      *out = fixed_width_binary(fw_binary->byteWidth());
+      return Status::OK();
+    }
+    case flatbuf::Type_Utf8:
+      *out = utf8();
+      return Status::OK();
+    case flatbuf::Type_Bool:
+      *out = boolean();
+      return Status::OK();
+    case flatbuf::Type_Decimal:
+      return Status::NotImplemented("Decimal");
+    case flatbuf::Type_Date:
+      *out = date();
+      return Status::OK();
+    case flatbuf::Type_Time: {
+      auto time_type = static_cast<const flatbuf::Time*>(type_data);
+      *out = time(FromFlatbufferUnit(time_type->unit()));
+      return Status::OK();
+    }
+    case flatbuf::Type_Timestamp: {
+      auto ts_type = static_cast<const flatbuf::Timestamp*>(type_data);
+      *out = timestamp(FromFlatbufferUnit(ts_type->unit()));
+      return Status::OK();
+    }
+    case flatbuf::Type_Interval:
+      return Status::NotImplemented("Interval");
+    case flatbuf::Type_List:
+      if (children.size() != 1) {
+        return Status::Invalid("List must have exactly 1 child field");
+      }
+      *out = std::make_shared<ListType>(children[0]);
+      return Status::OK();
+    case flatbuf::Type_Struct_:
+      *out = std::make_shared<StructType>(children);
+      return Status::OK();
+    case flatbuf::Type_Union:
+      return UnionFromFlatbuffer(
+          static_cast<const flatbuf::Union*>(type_data), children, out);
+    default:
+      return Status::Invalid("Unrecognized type");
+  }
+}
+
+// TODO(wesm): Convert this to visitor pattern
+static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+    std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
+    flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) {
+  if (type->type == Type::DICTIONARY) {
+    // In this library, the dictionary "type" is a logical construct. Here we
+    // pass through to the value type, as we've already captured the index
+    // type in the DictionaryEncoding metadata in the parent field
+    const auto& dict_type = static_cast<const DictionaryType&>(*type);
+    return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout,
+        out_type, dictionary_memo, offset);
+  }
+
+  std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
+  for (const BufferDescr& descr : buffer_layout) {
+    flatbuf::VectorType vector_type;
+    switch (descr.type()) {
+      case BufferType::OFFSET:
+        vector_type = flatbuf::VectorType_OFFSET;
+        break;
+      case BufferType::DATA:
+        vector_type = flatbuf::VectorType_DATA;
+        break;
+      case BufferType::VALIDITY:
+        vector_type = flatbuf::VectorType_VALIDITY;
+        break;
+      case BufferType::TYPE:
+        vector_type = flatbuf::VectorType_TYPE;
+        break;
+      default:
+        vector_type = flatbuf::VectorType_DATA;
+        break;
+    }
+    auto offset = flatbuf::CreateVectorLayout(
+        fbb, static_cast<int16_t>(descr.bit_width()), vector_type);
+    layout->push_back(offset);
+  }
+
+  switch (type->type) {
+    case Type::BOOL:
+      *out_type = flatbuf::Type_Bool;
+      *offset = flatbuf::CreateBool(fbb).Union();
+      break;
+    case Type::UINT8:
+      INT_TO_FB_CASE(8, false);
+    case Type::INT8:
+      INT_TO_FB_CASE(8, true);
+    case Type::UINT16:
+      INT_TO_FB_CASE(16, false);
+    case Type::INT16:
+      INT_TO_FB_CASE(16, true);
+    case Type::UINT32:
+      INT_TO_FB_CASE(32, false);
+    case Type::INT32:
+      INT_TO_FB_CASE(32, true);
+    case Type::UINT64:
+      INT_TO_FB_CASE(64, false);
+    case Type::INT64:
+      INT_TO_FB_CASE(64, true);
+    case Type::FLOAT:
+      *out_type = flatbuf::Type_FloatingPoint;
+      *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_SINGLE);
+      break;
+    case Type::DOUBLE:
+      *out_type = flatbuf::Type_FloatingPoint;
+      *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE);
+      break;
+    case Type::FIXED_WIDTH_BINARY: {
+      const auto& fw_type = static_cast<const FixedWidthBinaryType&>(*type);
+      *out_type = flatbuf::Type_FixedWidthBinary;
+      *offset = flatbuf::CreateFixedWidthBinary(fbb, fw_type.byte_width()).Union();
+    } break;
+    case Type::BINARY:
+      *out_type = flatbuf::Type_Binary;
+      *offset = flatbuf::CreateBinary(fbb).Union();
+      break;
+    case Type::STRING:
+      *out_type = flatbuf::Type_Utf8;
+      *offset = flatbuf::CreateUtf8(fbb).Union();
+      break;
+    case Type::DATE:
+      *out_type = flatbuf::Type_Date;
+      *offset = flatbuf::CreateDate(fbb).Union();
+      break;
+    case Type::TIME: {
+      const auto& time_type = static_cast<const TimeType&>(*type);
+      *out_type = flatbuf::Type_Time;
+      *offset = flatbuf::CreateTime(fbb, ToFlatbufferUnit(time_type.unit)).Union();
+    } break;
+    case Type::TIMESTAMP: {
+      const auto& ts_type = static_cast<const TimestampType&>(*type);
+      *out_type = flatbuf::Type_Timestamp;
+      *offset = flatbuf::CreateTimestamp(fbb, ToFlatbufferUnit(ts_type.unit)).Union();
+    } break;
+    case Type::LIST:
+      *out_type = flatbuf::Type_List;
+      return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset);
+    case Type::STRUCT:
+      *out_type = flatbuf::Type_Struct_;
+      return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset);
+    case Type::UNION:
+      *out_type = flatbuf::Type_Union;
+      return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset);
+    default:
+      *out_type = flatbuf::Type_NONE;  // Make clang-tidy happy
+      std::stringstream ss;
+      ss << "Unable to convert type: " << type->ToString() << std::endl;
+      return Status::NotImplemented(ss.str());
+  }
+  return Status::OK();
+}
+
+static DictionaryOffset GetDictionaryEncoding(
+    FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) {
+  int64_t dictionary_id = memo->GetId(type.dictionary());
+
+  // We assume that the dictionary index type (as an integer) has already been
+  // validated elsewhere, and can safely assume we are dealing with signed
+  // integers
+  const auto& fw_index_type = static_cast<const FixedWidthType&>(*type.index_type());
+
+  auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), true);
+
+  // TODO(wesm): ordered dictionaries
+  return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, index_type_offset);
+}
+
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+    DictionaryMemo* dictionary_memo, FieldOffset* offset) {
+  auto fb_name = fbb.CreateString(field->name);
+
+  flatbuf::Type type_enum;
+  Offset type_offset;
+  Offset type_layout;
+  std::vector<FieldOffset> children;
+  std::vector<VectorLayoutOffset> layout;
+
+  RETURN_NOT_OK(TypeToFlatbuffer(
+      fbb, field->type, &children, &layout, &type_enum, dictionary_memo, &type_offset));
+  auto fb_children = fbb.CreateVector(children);
+  auto fb_layout = fbb.CreateVector(layout);
+
+  DictionaryOffset dictionary = 0;
+  if (field->type->type == Type::DICTIONARY) {
+    dictionary = GetDictionaryEncoding(
+        fbb, static_cast<const DictionaryType&>(*field->type), dictionary_memo);
+  }
+
+  // TODO: produce the list of VectorTypes
+  *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_offset,
+      dictionary, fb_children, fb_layout);
+
+  return Status::OK();
+}
+
+static Status FieldFromFlatbuffer(const flatbuf::Field* field,
+    const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) {
+  std::shared_ptr<DataType> type;
+
+  const flatbuf::DictionaryEncoding* encoding = field->dictionary();
+
+  if (encoding == nullptr) {
+    // The field is not dictionary encoded. We must potentially visit its
+    // children to fully reconstruct the data type
+    auto children = field->children();
+    std::vector<std::shared_ptr<Field>> child_fields(children->size());
+    for (int i = 0; i < static_cast<int>(children->size()); ++i) {
+      RETURN_NOT_OK(
+          FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i]));
+    }
+    RETURN_NOT_OK(
+        TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type));
+  } else {
+    // The field is dictionary encoded. The type of the dictionary values has
+    // been determined elsewhere, and is stored in the DictionaryMemo. Here we
+    // construct the logical DictionaryType object
+
+    std::shared_ptr<Array> dictionary;
+    RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary));
+
+    std::shared_ptr<DataType> index_type;
+    RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type));
+    type = std::make_shared<DictionaryType>(index_type, dictionary);
+  }
+  *out = std::make_shared<Field>(field->name()->str(), type, field->nullable());
+  return Status::OK();
+}
+
+static Status FieldFromFlatbufferDictionary(
+    const flatbuf::Field* field, std::shared_ptr<Field>* out) {
+  // Need an empty memo to pass down for constructing children
+  DictionaryMemo dummy_memo;
+
+  // Any DictionaryEncoding set is ignored here
+
+  std::shared_ptr<DataType> type;
+  auto children = field->children();
+  std::vector<std::shared_ptr<Field>> child_fields(children->size());
+  for (int i = 0; i < static_cast<int>(children->size()); ++i) {
+    RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i]));
+  }
+
+  RETURN_NOT_OK(
+      TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type));
+
+  *out = std::make_shared<Field>(field->name()->str(), type, field->nullable());
+  return Status::OK();
+}
+
+// will return the endianness of the system we are running on
+// based the NUMPY_API function. See NOTICE.txt
+flatbuf::Endianness endianness() {
+  union {
+    uint32_t i;
+    char c[4];
+  } bint = {0x01020304};
+
+  return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little;
+}
+
+static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema,
+    DictionaryMemo* dictionary_memo, flatbuffers::Offset<flatbuf::Schema>* out) {
+  std::vector<FieldOffset> field_offsets;
+  for (int i = 0; i < schema.num_fields(); ++i) {
+    std::shared_ptr<Field> field = schema.field(i);
+    FieldOffset offset;
+    RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset));
+    field_offsets.push_back(offset);
+  }
+
+  *out = flatbuf::CreateSchema(fbb, endianness(), fbb.CreateVector(field_offsets));
+  return Status::OK();
+}
+
+static Status WriteFlatbufferBuilder(FBB& fbb, std::shared_ptr<Buffer>* out) {
+  int32_t size = fbb.GetSize();
+
+  auto result = std::make_shared<PoolBuffer>();
+  RETURN_NOT_OK(result->Resize(size));
+
+  uint8_t* dst = result->mutable_data();
+  memcpy(dst, fbb.GetBufferPointer(), size);
+  *out = result;
+  return Status::OK();
+}
+
+static Status WriteMessage(FBB& fbb, flatbuf::MessageHeader header_type,
+    flatbuffers::Offset<void> header, int64_t body_length, std::shared_ptr<Buffer>* out) {
+  auto message =
+      flatbuf::CreateMessage(fbb, kMetadataVersion, header_type, header, body_length);
+  fbb.Finish(message);
+  return WriteFlatbufferBuilder(fbb, out);
+}
+
+Status WriteSchemaMessage(
+    const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) {
+  FBB fbb;
+  flatbuffers::Offset<flatbuf::Schema> fb_schema;
+  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
+  return WriteMessage(fbb, flatbuf::MessageHeader_Schema, fb_schema.Union(), 0, out);
+}
+
+using FieldNodeVector =
+    flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>;
+using LargeFieldNodeVector =
+    flatbuffers::Offset<flatbuffers::Vector<const flatbuf::LargeFieldNode*>>;
+using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>;
+
+static Status WriteFieldNodes(
+    FBB& fbb, const std::vector<FieldMetadata>& nodes, FieldNodeVector* out) {
+  std::vector<flatbuf::FieldNode> fb_nodes;
+  fb_nodes.reserve(nodes.size());
+
+  for (size_t i = 0; i < nodes.size(); ++i) {
+    const FieldMetadata& node = nodes[i];
+    if (node.offset != 0) {
+      return Status::Invalid("Field metadata for IPC must have offset 0");
+    }
+    fb_nodes.emplace_back(
+        static_cast<int32_t>(node.length), static_cast<int32_t>(node.null_count));
+  }
+  *out = fbb.CreateVectorOfStructs(fb_nodes);
+  return Status::OK();
+}
+
+static Status WriteLargeFieldNodes(
+    FBB& fbb, const std::vector<FieldMetadata>& nodes, LargeFieldNodeVector* out) {
+  std::vector<flatbuf::LargeFieldNode> fb_nodes;
+  fb_nodes.reserve(nodes.size());
+
+  for (size_t i = 0; i < nodes.size(); ++i) {
+    const FieldMetadata& node = nodes[i];
+    if (node.offset != 0) {
+      return Status::Invalid("Field metadata for IPC must have offset 0");
+    }
+    fb_nodes.emplace_back(node.length, node.null_count);
+  }
+  *out = fbb.CreateVectorOfStructs(fb_nodes);
+  return Status::OK();
+}
+
+static Status WriteBuffers(
+    FBB& fbb, const std::vector<BufferMetadata>& buffers, BufferVector* out) {
+  std::vector<flatbuf::Buffer> fb_buffers;
+  fb_buffers.reserve(buffers.size());
+
+  for (size_t i = 0; i < buffers.size(); ++i) {
+    const BufferMetadata& buffer = buffers[i];
+    fb_buffers.emplace_back(buffer.page, buffer.offset, buffer.length);
+  }
+  *out = fbb.CreateVectorOfStructs(fb_buffers);
+  return Status::OK();
+}
+
+static Status MakeRecordBatch(FBB& fbb, int32_t length, int64_t body_length,
+    const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+    RecordBatchOffset* offset) {
+  FieldNodeVector fb_nodes;
+  BufferVector fb_buffers;
+
+  RETURN_NOT_OK(WriteFieldNodes(fbb, nodes, &fb_nodes));
+  RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers));
+
+  *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers);
+  return Status::OK();
+}
+
+static Status MakeLargeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
+    const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+    LargeRecordBatchOffset* offset) {
+  LargeFieldNodeVector fb_nodes;
+  BufferVector fb_buffers;
+
+  RETURN_NOT_OK(WriteLargeFieldNodes(fbb, nodes, &fb_nodes));
+  RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers));
+
+  *offset = flatbuf::CreateLargeRecordBatch(fbb, length, fb_nodes, fb_buffers);
+  return Status::OK();
+}
+
+Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
+    const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+    std::shared_ptr<Buffer>* out) {
+  FBB fbb;
+  RecordBatchOffset record_batch;
+  RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
+  return WriteMessage(
+      fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), body_length, out);
+}
+
+Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length,
+    const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+    std::shared_ptr<Buffer>* out) {
+  FBB fbb;
+  LargeRecordBatchOffset large_batch;
+  RETURN_NOT_OK(
+      MakeLargeRecordBatch(fbb, length, body_length, nodes, buffers, &large_batch));
+  return WriteMessage(fbb, flatbuf::MessageHeader_LargeRecordBatch, large_batch.Union(),
+      body_length, out);
+}
+
+Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
+    const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+    std::shared_ptr<Buffer>* out) {
+  FBB fbb;
+  RecordBatchOffset record_batch;
+  RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
+  auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union();
+  return WriteMessage(
+      fbb, flatbuf::MessageHeader_DictionaryBatch, dictionary_batch, body_length, out);
+}
+
+static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
+FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
+  std::vector<flatbuf::Block> fb_blocks;
+
+  for (const FileBlock& block : blocks) {
+    fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
+  }
+
+  return fbb.CreateVectorOfStructs(fb_blocks);
+}
+
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+    const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
+    io::OutputStream* out) {
+  FBB fbb;
+
+  flatbuffers::Offset<flatbuf::Schema> fb_schema;
+  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
+
+  auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
+  auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
+
+  auto footer = flatbuf::CreateFooter(
+      fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
+
+  fbb.Finish(footer);
+
+  int32_t size = fbb.GetSize();
+
+  return out->Write(fbb.GetBufferPointer(), size);
+}
+
 // ----------------------------------------------------------------------
 // Memoization data structure for handling shared dictionaries
 
@@ -158,7 +795,18 @@ int64_t Message::body_length() const {
 // ----------------------------------------------------------------------
 // SchemaMetadata
 
-class SchemaMetadata::SchemaMetadataImpl {
+class MessageHolder {
+ public:
+  void set_message(const std::shared_ptr<Message>& message) { message_ = message; }
+  void set_buffer(const std::shared_ptr<Buffer>& buffer) { buffer_ = buffer; }
+
+ protected:
+  // Possible parents, owns the flatbuffer data
+  std::shared_ptr<Message> message_;
+  std::shared_ptr<Buffer> buffer_;
+};
+
+class SchemaMetadata::SchemaMetadataImpl : public MessageHolder {
  public:
   explicit SchemaMetadataImpl(const void* schema)
       : schema_(static_cast<const flatbuf::Schema*>(schema)) {}
@@ -196,15 +844,19 @@ class SchemaMetadata::SchemaMetadataImpl {
   const flatbuf::Schema* schema_;
 };
 
-SchemaMetadata::SchemaMetadata(
-    const std::shared_ptr<Message>& message, const void* flatbuf) {
-  message_ = message;
-  impl_.reset(new SchemaMetadataImpl(flatbuf));
+SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message)
+    : SchemaMetadata(message->impl_->header()) {
+  impl_->set_message(message);
 }
 
-SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message) {
-  message_ = message;
-  impl_.reset(new SchemaMetadataImpl(message->impl_->header()));
+SchemaMetadata::SchemaMetadata(const void* header) {
+  impl_.reset(new SchemaMetadataImpl(header));
+}
+
+SchemaMetadata::SchemaMetadata(const std::shared_ptr<Buffer>& buffer, int64_t offset)
+    : SchemaMetadata(buffer->data() + offset) {
+  // Preserve ownership
+  impl_->set_buffer(buffer);
 }
 
 SchemaMetadata::~SchemaMetadata() {}
@@ -231,7 +883,7 @@ Status SchemaMetadata::GetSchema(
 // ----------------------------------------------------------------------
 // RecordBatchMetadata
 
-class RecordBatchMetadata::RecordBatchMetadataImpl {
+class RecordBatchMetadata::RecordBatchMetadataImpl : public MessageHolder {
  public:
   explicit RecordBatchMetadataImpl(const void* batch)
       : batch_(static_cast<const flatbuf::RecordBatch*>(batch)) {
@@ -249,22 +901,14 @@ class RecordBatchMetadata::RecordBatchMetadataImpl {
 
   int num_fields() const { return batch_->nodes()->size(); }
 
-  void set_message(const std::shared_ptr<Message>& message) { message_ = message; }
-
-  void set_buffer(const std::shared_ptr<Buffer>& buffer) { buffer_ = buffer; }
-
  private:
   const flatbuf::RecordBatch* batch_;
   const flatbuffers::Vector<const flatbuf::FieldNode*>* nodes_;
   const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_;
-
-  // Possible parents, owns the flatbuffer data
-  std::shared_ptr<Message> message_;
-  std::shared_ptr<Buffer> buffer_;
 };
 
-RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) {
-  impl_.reset(new RecordBatchMetadataImpl(message->impl_->header()));
+RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message)
+    : RecordBatchMetadata(message->impl_->header()) {
   impl_->set_message(message);
 }
 
@@ -358,8 +1002,8 @@ const RecordBatchMetadata& DictionaryBatchMetadata::record_batch() const {
 // ----------------------------------------------------------------------
 // Conveniences
 
-Status ReadMessage(int64_t offset, int32_t metadata_length,
-    io::RandomAccessFile* file, std::shared_ptr<Message>* message) {
+Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
+    std::shared_ptr<Message>* message) {
   std::shared_ptr<Buffer> buffer;
   RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 4eb0186..41e6c5e 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -107,10 +107,9 @@ class Message;
 // Container for serialized Schema metadata contained in an IPC message
 class ARROW_EXPORT SchemaMetadata {
  public:
+  explicit SchemaMetadata(const void* header);
   explicit SchemaMetadata(const std::shared_ptr<Message>& message);
-
-  // Accepts an opaque flatbuffer pointer
-  SchemaMetadata(const std::shared_ptr<Message>& message, const void* schema);
+  SchemaMetadata(const std::shared_ptr<Buffer>& message, int64_t offset);
 
   ~SchemaMetadata();
 
@@ -127,9 +126,6 @@ class ARROW_EXPORT SchemaMetadata {
       const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out) const;
 
  private:
-  // Parent, owns the flatbuffer data
-  std::shared_ptr<Message> message_;
-
   class SchemaMetadataImpl;
   std::unique_ptr<SchemaMetadataImpl> impl_;
 
@@ -145,8 +141,6 @@ struct ARROW_EXPORT BufferMetadata {
 // Container for serialized record batch metadata contained in an IPC message
 class ARROW_EXPORT RecordBatchMetadata {
  public:
-  // Instantiate from opaque pointer. Memory ownership must be preserved
-  // elsewhere (e.g. in a dictionary batch)
   explicit RecordBatchMetadata(const void* header);
   explicit RecordBatchMetadata(const std::shared_ptr<Message>& message);
   RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset);
@@ -218,8 +212,34 @@ class ARROW_EXPORT Message {
 /// \param[in] file the seekable file interface to read from
 /// \param[out] message the message read
 /// \return Status success or failure
-Status ReadMessage(int64_t offset, int32_t metadata_length,
-    io::RandomAccessFile* file, std::shared_ptr<Message>* message);
+Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
+    std::shared_ptr<Message>* message);
+
+// Serialize arrow::Schema as a Flatbuffer
+//
+// \param[in] schema a Schema instance
+// \param[inout] dictionary_memo class for tracking dictionaries and assigning
+// dictionary ids
+// \param[out] out the serialized arrow::Buffer
+// \return Status outcome
+Status WriteSchemaMessage(
+    const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out);
+
+Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
+    const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+    std::shared_ptr<Buffer>* out);
+
+Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length,
+    const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+    std::shared_ptr<Buffer>* out);
+
+Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
+    const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+    std::shared_ptr<Buffer>* out);
+
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+    const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
+    io::OutputStream* out);
 
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 9575364..a2b20a9 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -26,17 +26,115 @@
 #include "arrow/buffer.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/io/memory.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/metadata-internal.h"
+#include "arrow/ipc/File_generated.h"
+#include "arrow/ipc/Message_generated.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
+#include "arrow/schema.h"
 #include "arrow/status.h"
+#include "arrow/table.h"
 #include "arrow/util/logging.h"
 
 namespace arrow {
+
+namespace flatbuf = org::apache::arrow::flatbuf;
+
 namespace ipc {
 
 // ----------------------------------------------------------------------
+// Record batch read path
+
+class IpcComponentSource : public ArrayComponentSource {
+ public:
+  IpcComponentSource(const RecordBatchMetadata& metadata, io::RandomAccessFile* file)
+      : metadata_(metadata), file_(file) {}
+
+  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
+    BufferMetadata buffer_meta = metadata_.buffer(buffer_index);
+    if (buffer_meta.length == 0) {
+      *out = nullptr;
+      return Status::OK();
+    } else {
+      return file_->ReadAt(buffer_meta.offset, buffer_meta.length, out);
+    }
+  }
+
+  Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override {
+    // pop off a field
+    if (field_index >= metadata_.num_fields()) {
+      return Status::Invalid("Ran out of field metadata, likely malformed");
+    }
+    *metadata = metadata_.field(field_index);
+    return Status::OK();
+  }
+
+ private:
+  const RecordBatchMetadata& metadata_;
+  io::RandomAccessFile* file_;
+};
+
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
+    const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
+    std::shared_ptr<RecordBatch>* out) {
+  return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
+}
+
+static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema,
+    int64_t num_rows, int max_recursion_depth, ArrayComponentSource* source,
+    std::shared_ptr<RecordBatch>* out) {
+  std::vector<std::shared_ptr<Array>> arrays(schema->num_fields());
+
+  ArrayLoaderContext context;
+  context.source = source;
+  context.field_index = 0;
+  context.buffer_index = 0;
+  context.max_recursion_depth = max_recursion_depth;
+
+  for (int i = 0; i < schema->num_fields(); ++i) {
+    RETURN_NOT_OK(LoadArray(schema->field(i)->type, &context, &arrays[i]));
+  }
+
+  *out = std::make_shared<RecordBatch>(schema, num_rows, arrays);
+  return Status::OK();
+}
+
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
+    const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+    io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
+  IpcComponentSource source(metadata, file);
+  return LoadRecordBatchFromSource(
+      schema, metadata.length(), max_recursion_depth, &source, out);
+}
+
+Status ReadDictionary(const DictionaryBatchMetadata& metadata,
+    const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file,
+    std::shared_ptr<Array>* out) {
+  int64_t id = metadata.id();
+  auto it = dictionary_types.find(id);
+  if (it == dictionary_types.end()) {
+    std::stringstream ss;
+    ss << "Do not have type metadata for dictionary with id: " << id;
+    return Status::KeyError(ss.str());
+  }
+
+  std::vector<std::shared_ptr<Field>> fields = {it->second};
+
+  // We need a schema for the record batch
+  auto dummy_schema = std::make_shared<Schema>(fields);
+
+  // The dictionary is embedded in a record batch with a single column
+  std::shared_ptr<RecordBatch> batch;
+  RETURN_NOT_OK(ReadRecordBatch(metadata.record_batch(), dummy_schema, file, &batch));
+
+  if (batch->num_columns() != 1) {
+    return Status::Invalid("Dictionary record batch must only contain one field");
+  }
+
+  *out = batch->column(0);
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
 // StreamReader implementation
 
 static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
@@ -228,7 +326,7 @@ class FileReader::FileReaderImpl {
 
     // TODO(wesm): Verify the footer
     footer_ = flatbuf::GetFooter(footer_buffer_->data());
-    schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema()));
+    schema_metadata_.reset(new SchemaMetadata(footer_->schema()));
 
     return Status::OK();
   }
@@ -307,8 +405,7 @@ class FileReader::FileReaderImpl {
     return schema_metadata_->GetSchema(*dictionary_memo_, &schema_);
   }
 
-  Status Open(
-      const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset) {
+  Status Open(const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset) {
     file_ = file;
     footer_offset_ = footer_offset;
     RETURN_NOT_OK(ReadFooter());
@@ -371,5 +468,69 @@ Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
   return impl_->GetRecordBatch(i, batch);
 }
 
+// ----------------------------------------------------------------------
+// Read LargeRecordBatch
+
+class LargeRecordBatchSource : public ArrayComponentSource {
+ public:
+  LargeRecordBatchSource(
+      const flatbuf::LargeRecordBatch* metadata, io::RandomAccessFile* file)
+      : metadata_(metadata), file_(file) {}
+
+  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
+    if (buffer_index >= static_cast<int>(metadata_->buffers()->size())) {
+      return Status::Invalid("Ran out of buffer metadata, likely malformed");
+    }
+    const flatbuf::Buffer* buffer = metadata_->buffers()->Get(buffer_index);
+
+    if (buffer->length() == 0) {
+      *out = nullptr;
+      return Status::OK();
+    } else {
+      return file_->ReadAt(buffer->offset(), buffer->length(), out);
+    }
+  }
+
+  Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override {
+    // pop off a field
+    if (field_index >= static_cast<int>(metadata_->nodes()->size())) {
+      return Status::Invalid("Ran out of field metadata, likely malformed");
+    }
+    const flatbuf::LargeFieldNode* node = metadata_->nodes()->Get(field_index);
+
+    metadata->length = node->length();
+    metadata->null_count = node->null_count();
+    metadata->offset = 0;
+    return Status::OK();
+  }
+
+ private:
+  const flatbuf::LargeRecordBatch* metadata_;
+  io::RandomAccessFile* file_;
+};
+
+Status ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset,
+    io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
+  std::shared_ptr<Buffer> buffer;
+  RETURN_NOT_OK(file->Seek(offset));
+
+  RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer));
+  int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
+
+  RETURN_NOT_OK(file->Read(flatbuffer_size, &buffer));
+  auto message = flatbuf::GetMessage(buffer->data());
+  auto batch = reinterpret_cast<const flatbuf::LargeRecordBatch*>(message->header());
+
+  // TODO(ARROW-388): The buffer offsets start at 0, so we must construct a
+  // RandomAccessFile according to that frame of reference
+  std::shared_ptr<Buffer> buffer_payload;
+  RETURN_NOT_OK(file->Read(message->bodyLength(), &buffer_payload));
+  io::BufferReader buffer_reader(buffer_payload);
+
+  LargeRecordBatchSource source(batch, &buffer_reader);
+  return LoadRecordBatchFromSource(
+      schema, batch->length(), kMaxNestingDepth, &source, out);
+}
+
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index ca91765..1c1314a 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -43,6 +43,20 @@ class RandomAccessFile;
 
 namespace ipc {
 
+// Generic read functionsh; does not copy data if the input supports zero copy reads
+
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
+    const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
+    std::shared_ptr<RecordBatch>* out);
+
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
+    const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+    io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
+
+Status ReadDictionary(const DictionaryBatchMetadata& metadata,
+    const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file,
+    std::shared_ptr<Array>* out);
+
 class ARROW_EXPORT StreamReader {
  public:
   ~StreamReader();
@@ -106,6 +120,14 @@ class ARROW_EXPORT FileReader {
   std::unique_ptr<FileReaderImpl> impl_;
 };
 
+// ----------------------------------------------------------------------
+//
+
+/// EXPERIMENTAL: Read length-prefixed LargeRecordBatch metadata (64-bit array
+/// lengths) at offset and reconstruct RecordBatch
+Status ARROW_EXPORT ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema,
+    int64_t offset, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
+
 }  // namespace ipc
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 66a5e09..ba203b0 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -103,7 +103,7 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li
 typedef Status MakeRecordBatch(std::shared_ptr<RecordBatch>* out);
 
 Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
-  const int length = 1000;
+  const int length = 10;
 
   // Make the schema
   auto f0 = field("f0", int32());

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 58402b5..82c119e 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -17,28 +17,510 @@
 
 #include "arrow/ipc/writer.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <limits>
 #include <sstream>
 #include <vector>
 
+#include "arrow/array.h"
 #include "arrow/buffer.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/io/memory.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
+#include "arrow/loader.h"
 #include "arrow/memory_pool.h"
 #include "arrow/schema.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
+#include "arrow/type.h"
+#include "arrow/util/bit-util.h"
 #include "arrow/util/logging.h"
 
 namespace arrow {
 namespace ipc {
 
 // ----------------------------------------------------------------------
+// Record batch write path
+
+class RecordBatchWriter : public ArrayVisitor {
+ public:
+  RecordBatchWriter(
+      MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth)
+      : pool_(pool),
+        max_recursion_depth_(max_recursion_depth),
+        buffer_start_offset_(buffer_start_offset) {
+    DCHECK_GT(max_recursion_depth, 0);
+  }
+
+  virtual ~RecordBatchWriter() = default;
+
+  virtual Status CheckArrayMetadata(const Array& arr) {
+    if (arr.length() > std::numeric_limits<int32_t>::max()) {
+      return Status::Invalid("Cannot write arrays larger than 2^31 - 1 in length");
+    }
+    return Status::OK();
+  }
+
+  Status VisitArray(const Array& arr) {
+    if (max_recursion_depth_ <= 0) {
+      return Status::Invalid("Max recursion depth reached");
+    }
+
+    RETURN_NOT_OK(CheckArrayMetadata(arr));
+
+    // push back all common elements
+    field_nodes_.emplace_back(arr.length(), arr.null_count(), 0);
+
+    if (arr.null_count() > 0) {
+      std::shared_ptr<Buffer> bitmap = arr.null_bitmap();
+
+      if (arr.offset() != 0) {
+        // With a sliced array / non-zero offset, we must copy the bitmap
+        RETURN_NOT_OK(
+            CopyBitmap(pool_, bitmap->data(), arr.offset(), arr.length(), &bitmap));
+      }
+
+      buffers_.push_back(bitmap);
+    } else {
+      // Push a dummy zero-length buffer, not to be copied
+      buffers_.push_back(std::make_shared<Buffer>(nullptr, 0));
+    }
+    return arr.Accept(this);
+  }
+
+  Status Assemble(const RecordBatch& batch, int64_t* body_length) {
+    if (field_nodes_.size() > 0) {
+      field_nodes_.clear();
+      buffer_meta_.clear();
+      buffers_.clear();
+    }
+
+    // Perform depth-first traversal of the row-batch
+    for (int i = 0; i < batch.num_columns(); ++i) {
+      RETURN_NOT_OK(VisitArray(*batch.column(i)));
+    }
+
+    // The position for the start of a buffer relative to the passed frame of
+    // reference. May be 0 or some other position in an address space
+    int64_t offset = buffer_start_offset_;
+
+    buffer_meta_.reserve(buffers_.size());
+
+    const int32_t kNoPageId = -1;
+
+    // Construct the buffer metadata for the record batch header
+    for (size_t i = 0; i < buffers_.size(); ++i) {
+      const Buffer* buffer = buffers_[i].get();
+      int64_t size = 0;
+      int64_t padding = 0;
+
+      // The buffer might be null if we are handling zero row lengths.
+      if (buffer) {
+        size = buffer->size();
+        padding = BitUtil::RoundUpToMultipleOf64(size) - size;
+      }
+
+      // TODO(wesm): We currently have no notion of shared memory page id's,
+      // but we've included it in the metadata IDL for when we have it in the
+      // future. Use page = -1 for now
+      //
+      // Note that page ids are a bespoke notion for Arrow and not a feature we
+      // are using from any OS-level shared memory. The thought is that systems
+      // may (in the future) associate integer page id's with physical memory
+      // pages (according to whatever is the desired shared memory mechanism)
+      buffer_meta_.push_back({kNoPageId, offset, size + padding});
+      offset += size + padding;
+    }
+
+    *body_length = offset - buffer_start_offset_;
+    DCHECK(BitUtil::IsMultipleOf64(*body_length));
+
+    return Status::OK();
+  }
+
+  // Override this for writing dictionary metadata
+  virtual Status WriteMetadataMessage(
+      int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) {
+    return WriteRecordBatchMessage(
+        static_cast<int32_t>(num_rows), body_length, field_nodes_, buffer_meta_, out);
+  }
+
+  Status WriteMetadata(int64_t num_rows, int64_t body_length, io::OutputStream* dst,
+      int32_t* metadata_length) {
+    // Now that we have computed the locations of all of the buffers in shared
+    // memory, the data header can be converted to a flatbuffer and written out
+    //
+    // Note: The memory written here is prefixed by the size of the flatbuffer
+    // itself as an int32_t.
+    std::shared_ptr<Buffer> metadata_fb;
+    RETURN_NOT_OK(WriteMetadataMessage(num_rows, body_length, &metadata_fb));
+
+    // Need to write 4 bytes (metadata size), the metadata, plus padding to
+    // end on an 8-byte offset
+    int64_t start_offset;
+    RETURN_NOT_OK(dst->Tell(&start_offset));
+
+    int32_t padded_metadata_length = static_cast<int32_t>(metadata_fb->size()) + 4;
+    const int32_t remainder =
+        (padded_metadata_length + static_cast<int32_t>(start_offset)) % 8;
+    if (remainder != 0) { padded_metadata_length += 8 - remainder; }
+
+    // The returned metadata size includes the length prefix, the flatbuffer,
+    // plus padding
+    *metadata_length = padded_metadata_length;
+
+    // Write the flatbuffer size prefix including padding
+    int32_t flatbuffer_size = padded_metadata_length - 4;
+    RETURN_NOT_OK(
+        dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
+
+    // Write the flatbuffer
+    RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size()));
+
+    // Write any padding
+    int32_t padding =
+        padded_metadata_length - static_cast<int32_t>(metadata_fb->size()) - 4;
+    if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
+
+    return Status::OK();
+  }
+
+  Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length,
+      int64_t* body_length) {
+    RETURN_NOT_OK(Assemble(batch, body_length));
+
+#ifndef NDEBUG
+    int64_t start_position, current_position;
+    RETURN_NOT_OK(dst->Tell(&start_position));
+#endif
+
+    RETURN_NOT_OK(WriteMetadata(batch.num_rows(), *body_length, dst, metadata_length));
+
+#ifndef NDEBUG
+    RETURN_NOT_OK(dst->Tell(&current_position));
+    DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+    // Now write the buffers
+    for (size_t i = 0; i < buffers_.size(); ++i) {
+      const Buffer* buffer = buffers_[i].get();
+      int64_t size = 0;
+      int64_t padding = 0;
+
+      // The buffer might be null if we are handling zero row lengths.
+      if (buffer) {
+        size = buffer->size();
+        padding = BitUtil::RoundUpToMultipleOf64(size) - size;
+      }
+
+      if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); }
+
+      if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
+    }
+
+#ifndef NDEBUG
+    RETURN_NOT_OK(dst->Tell(&current_position));
+    DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+    return Status::OK();
+  }
+
+  Status GetTotalSize(const RecordBatch& batch, int64_t* size) {
+    // emulates the behavior of Write without actually writing
+    int32_t metadata_length = 0;
+    int64_t body_length = 0;
+    MockOutputStream dst;
+    RETURN_NOT_OK(Write(batch, &dst, &metadata_length, &body_length));
+    *size = dst.GetExtentBytesWritten();
+    return Status::OK();
+  }
+
+ protected:
+  template <typename ArrayType>
+  Status VisitFixedWidth(const ArrayType& array) {
+    std::shared_ptr<Buffer> data_buffer = array.data();
+
+    if (array.offset() != 0) {
+      // Non-zero offset, slice the buffer
+      const auto& fw_type = static_cast<const FixedWidthType&>(*array.type());
+      const int type_width = fw_type.bit_width() / 8;
+      const int64_t byte_offset = array.offset() * type_width;
+
+      // Send padding if it's available
+      const int64_t buffer_length =
+          std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
+              data_buffer->size() - byte_offset);
+      data_buffer = SliceBuffer(data_buffer, byte_offset, buffer_length);
+    }
+    buffers_.push_back(data_buffer);
+    return Status::OK();
+  }
+
+  template <typename ArrayType>
+  Status GetZeroBasedValueOffsets(
+      const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) {
+    // Share slicing logic between ListArray and BinaryArray
+
+    auto offsets = array.value_offsets();
+
+    if (array.offset() != 0) {
+      // If we have a non-zero offset, then the value offsets do not start at
+      // zero. We must a) create a new offsets array with shifted offsets and
+      // b) slice the values array accordingly
+
+      std::shared_ptr<MutableBuffer> shifted_offsets;
+      RETURN_NOT_OK(AllocateBuffer(
+          pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets));
+
+      int32_t* dest_offsets = reinterpret_cast<int32_t*>(shifted_offsets->mutable_data());
+      const int32_t start_offset = array.value_offset(0);
+
+      for (int i = 0; i < array.length(); ++i) {
+        dest_offsets[i] = array.value_offset(i) - start_offset;
+      }
+      // Final offset
+      dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset;
+      offsets = shifted_offsets;
+    }
+
+    *value_offsets = offsets;
+    return Status::OK();
+  }
+
+  Status VisitBinary(const BinaryArray& array) {
+    std::shared_ptr<Buffer> value_offsets;
+    RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array, &value_offsets));
+    auto data = array.data();
+
+    if (array.offset() != 0) {
+      // Slice the data buffer to include only the range we need now
+      data = SliceBuffer(data, array.value_offset(0), array.value_offset(array.length()));
+    }
+
+    buffers_.push_back(value_offsets);
+    buffers_.push_back(data);
+    return Status::OK();
+  }
+
+  Status Visit(const FixedWidthBinaryArray& array) override {
+    auto data = array.data();
+    int32_t width = array.byte_width();
+
+    if (array.offset() != 0) {
+      data = SliceBuffer(data, array.offset() * width, width * array.length());
+    }
+    buffers_.push_back(data);
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanArray& array) override {
+    buffers_.push_back(array.data());
+    return Status::OK();
+  }
+
+#define VISIT_FIXED_WIDTH(TYPE) \
+  Status Visit(const TYPE& array) override { return VisitFixedWidth<TYPE>(array); }
+
+  VISIT_FIXED_WIDTH(Int8Array);
+  VISIT_FIXED_WIDTH(Int16Array);
+  VISIT_FIXED_WIDTH(Int32Array);
+  VISIT_FIXED_WIDTH(Int64Array);
+  VISIT_FIXED_WIDTH(UInt8Array);
+  VISIT_FIXED_WIDTH(UInt16Array);
+  VISIT_FIXED_WIDTH(UInt32Array);
+  VISIT_FIXED_WIDTH(UInt64Array);
+  VISIT_FIXED_WIDTH(HalfFloatArray);
+  VISIT_FIXED_WIDTH(FloatArray);
+  VISIT_FIXED_WIDTH(DoubleArray);
+  VISIT_FIXED_WIDTH(DateArray);
+  VISIT_FIXED_WIDTH(Date32Array);
+  VISIT_FIXED_WIDTH(TimeArray);
+  VISIT_FIXED_WIDTH(TimestampArray);
+
+#undef VISIT_FIXED_WIDTH
+
+  Status Visit(const StringArray& array) override { return VisitBinary(array); }
+
+  Status Visit(const BinaryArray& array) override { return VisitBinary(array); }
+
+  Status Visit(const ListArray& array) override {
+    std::shared_ptr<Buffer> value_offsets;
+    RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
+    buffers_.push_back(value_offsets);
+
+    --max_recursion_depth_;
+    std::shared_ptr<Array> values = array.values();
+
+    if (array.offset() != 0) {
+      // For non-zero offset, we slice the values array accordingly
+      const int32_t offset = array.value_offset(0);
+      const int32_t length = array.value_offset(array.length()) - offset;
+      values = values->Slice(offset, length);
+    }
+    RETURN_NOT_OK(VisitArray(*values));
+    ++max_recursion_depth_;
+    return Status::OK();
+  }
+
+  Status Visit(const StructArray& array) override {
+    --max_recursion_depth_;
+    for (std::shared_ptr<Array> field : array.fields()) {
+      if (array.offset() != 0) {
+        // If offset is non-zero, slice the child array
+        field = field->Slice(array.offset(), array.length());
+      }
+      RETURN_NOT_OK(VisitArray(*field));
+    }
+    ++max_recursion_depth_;
+    return Status::OK();
+  }
+
+  Status Visit(const UnionArray& array) override {
+    auto type_ids = array.type_ids();
+    if (array.offset() != 0) {
+      type_ids = SliceBuffer(type_ids, array.offset() * sizeof(UnionArray::type_id_t),
+          array.length() * sizeof(UnionArray::type_id_t));
+    }
+
+    buffers_.push_back(type_ids);
+
+    --max_recursion_depth_;
+    if (array.mode() == UnionMode::DENSE) {
+      const auto& type = static_cast<const UnionType&>(*array.type());
+      auto value_offsets = array.value_offsets();
+
+      // The Union type codes are not necessary 0-indexed
+      uint8_t max_code = 0;
+      for (uint8_t code : type.type_codes) {
+        if (code > max_code) { max_code = code; }
+      }
+
+      // Allocate an array of child offsets. Set all to -1 to indicate that we
+      // haven't observed a first occurrence of a particular child yet
+      std::vector<int32_t> child_offsets(max_code + 1);
+      std::vector<int32_t> child_lengths(max_code + 1, 0);
+
+      if (array.offset() != 0) {
+        // This is an unpleasant case. Because the offsets are different for
+        // each child array, when we have a sliced array, we need to "rebase"
+        // the value_offsets for each array
+
+        const int32_t* unshifted_offsets = array.raw_value_offsets();
+        const uint8_t* type_ids = array.raw_type_ids();
+
+        // Allocate the shifted offsets
+        std::shared_ptr<MutableBuffer> shifted_offsets_buffer;
+        RETURN_NOT_OK(AllocateBuffer(
+            pool_, array.length() * sizeof(int32_t), &shifted_offsets_buffer));
+        int32_t* shifted_offsets =
+            reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data());
+
+        for (int64_t i = 0; i < array.length(); ++i) {
+          const uint8_t code = type_ids[i];
+          int32_t shift = child_offsets[code];
+          if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; }
+          shifted_offsets[i] = unshifted_offsets[i] - shift;
+
+          // Update the child length to account for observed value
+          ++child_lengths[code];
+        }
+
+        value_offsets = shifted_offsets_buffer;
+      }
+      buffers_.push_back(value_offsets);
+
+      // Visit children and slice accordingly
+      for (int i = 0; i < type.num_children(); ++i) {
+        std::shared_ptr<Array> child = array.child(i);
+        if (array.offset() != 0) {
+          const uint8_t code = type.type_codes[i];
+          child = child->Slice(child_offsets[code], child_lengths[code]);
+        }
+        RETURN_NOT_OK(VisitArray(*child));
+      }
+    } else {
+      for (std::shared_ptr<Array> child : array.children()) {
+        // Sparse union, slicing is simpler
+        if (array.offset() != 0) {
+          // If offset is non-zero, slice the child array
+          child = child->Slice(array.offset(), array.length());
+        }
+        RETURN_NOT_OK(VisitArray(*child));
+      }
+    }
+    ++max_recursion_depth_;
+    return Status::OK();
+  }
+
+  Status Visit(const DictionaryArray& array) override {
+    // Dictionary written out separately. Slice offset contained in the indices
+    return array.indices()->Accept(this);
+  }
+
+  // In some cases, intermediate buffers may need to be allocated (with sliced arrays)
+  MemoryPool* pool_;
+
+  std::vector<FieldMetadata> field_nodes_;
+  std::vector<BufferMetadata> buffer_meta_;
+  std::vector<std::shared_ptr<Buffer>> buffers_;
+
+  int64_t max_recursion_depth_;
+  int64_t buffer_start_offset_;
+};
+
+class DictionaryWriter : public RecordBatchWriter {
+ public:
+  using RecordBatchWriter::RecordBatchWriter;
+
+  Status WriteMetadataMessage(
+      int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
+    return WriteDictionaryMessage(dictionary_id_, static_cast<int32_t>(num_rows),
+        body_length, field_nodes_, buffer_meta_, out);
+  }
+
+  Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
+      io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
+    dictionary_id_ = dictionary_id;
+
+    // Make a dummy record batch. A bit tedious as we have to make a schema
+    std::vector<std::shared_ptr<Field>> fields = {
+        arrow::field("dictionary", dictionary->type())};
+    auto schema = std::make_shared<Schema>(fields);
+    RecordBatch batch(schema, dictionary->length(), {dictionary});
+
+    return RecordBatchWriter::Write(batch, dst, metadata_length, body_length);
+  }
+
+ private:
+  // TODO(wesm): Setting this in Write is a bit unclean, but it works
+  int64_t dictionary_id_;
+};
+
+Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
+    io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
+    MemoryPool* pool, int max_recursion_depth) {
+  RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth);
+  return writer.Write(batch, dst, metadata_length, body_length);
+}
+
+Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
+    int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
+    int64_t* body_length, MemoryPool* pool) {
+  DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth);
+  return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length);
+}
+
+Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
+  RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth);
+  RETURN_NOT_OK(writer.GetTotalSize(batch, size));
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
 // Stream writer implementation
 
 class StreamWriter::StreamWriterImpl {
@@ -199,38 +681,6 @@ Status StreamWriter::Close() {
 // ----------------------------------------------------------------------
 // File writer implementation
 
-static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
-FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
-  std::vector<flatbuf::Block> fb_blocks;
-
-  for (const FileBlock& block : blocks) {
-    fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
-  }
-
-  return fbb.CreateVectorOfStructs(fb_blocks);
-}
-
-Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
-    const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
-    io::OutputStream* out) {
-  FBB fbb;
-
-  flatbuffers::Offset<flatbuf::Schema> fb_schema;
-  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
-
-  auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
-  auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
-
-  auto footer = flatbuf::CreateFooter(
-      fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
-
-  fbb.Finish(footer);
-
-  int32_t size = fbb.GetSize();
-
-  return out->Write(fbb.GetBufferPointer(), size);
-}
-
 class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl {
  public:
   using BASE = StreamWriter::StreamWriterImpl;
@@ -283,5 +733,31 @@ Status FileWriter::Close() {
   return impl_->Close();
 }
 
+// ----------------------------------------------------------------------
+// Write record batches with 64-bit size metadata
+
+class LargeRecordBatchWriter : public RecordBatchWriter {
+ public:
+  using RecordBatchWriter::RecordBatchWriter;
+
+  Status CheckArrayMetadata(const Array& arr) override {
+    // No < INT32_MAX length check
+    return Status::OK();
+  }
+
+  Status WriteMetadataMessage(
+      int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
+    return WriteLargeRecordBatchMessage(
+        num_rows, body_length, field_nodes_, buffer_meta_, out);
+  }
+};
+
+Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
+    io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
+    MemoryPool* pool, int max_recursion_depth) {
+  LargeRecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth);
+  return writer.Write(batch, dst, metadata_length, body_length);
+}
+
 }  // namespace ipc
 }  // namespace arrow