You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/06/11 17:08:20 UTC

[arrow] branch master updated: ARROW-1207: [C++] Implement MapArray, MapBuilder, MapType classes, and IPC support

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new dede1e6  ARROW-1207: [C++] Implement MapArray, MapBuilder, MapType classes, and IPC support
dede1e6 is described below

commit dede1e695140246ddf7caac8db94b81ce093a727
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Tue Jun 11 19:08:11 2019 +0200

    ARROW-1207: [C++] Implement MapArray, MapBuilder, MapType classes, and IPC support
    
    Implements `MapArray` as a subclass of `ListArray`, where each `value` in the list is a `key: item` pair. (This naming is not the most natural, but `value` is taken.)
    
    `MapType::keys_sorted()` is currently stored but unused- for example `MapBuilder` does not check inserted keys for correct ordering. `MapType` is printed as `map<utf8, int32>` and `map<int32, float64, keys_sorted>` for unsorted, sorted keys respectively.
    
    Map arrays are created with `ArrayFromJSON` by providing for each pair an array of length 2 containing the key and the mapped item [(example)](https://github.com/apache/arrow/compare/master...bkietz:1207-Implement-Map-logical-type?expand=1#diff-015ed4b6849ed6e64e25bba42aa1d29eR572).
    
    Author: Benjamin Kietzman <be...@gmail.com>
    
    Closes #4352 from bkietz/1207-Implement-Map-logical-type and squashes the following commits:
    
    9fb8700d7 <Benjamin Kietzman> explicitly disable map in flight test
    41b30161b <Benjamin Kietzman> more cleanup, disable JS ipc tests as well
    1b74aa128 <Benjamin Kietzman> disable map IPC tests for Java
    a0de5513f <Benjamin Kietzman> cleanup of code which assumes map has 2 children
    2aaab2914 <Benjamin Kietzman> ListType isa MapType
    9b455e76d <Benjamin Kietzman> Add IPC tests for Map
    62dade046 <Benjamin Kietzman> remove redundant null check
    a3be934e4 <Benjamin Kietzman> add tests using and validating MapBuilder
    c936ebd87 <Benjamin Kietzman> fix MapScalar typos
    1047a6d2e <Benjamin Kietzman> run clang-format
    31930ffe9 <Benjamin Kietzman> de-inline MapBuilder constructor
    eb6db030c <Benjamin Kietzman> set keys_, items_
    a5c88a116 <Benjamin Kietzman> fix: obj_ is not a pointer
    8049c515f <Benjamin Kietzman> MapArray isa ListArray
    4c11db99d <Benjamin Kietzman> adding some tests and filling out Map*
    f89da946b <Benjamin Kietzman> first pass at MapArray, MapBuilder, MapScalar
    7fbbe707d <Benjamin Kietzman> add checked_pointer_cast for unique_ptr
    5e727e575 <Benjamin Kietzman> add map() type factory
    e9b34d023 <Benjamin Kietzman> Add keysSorted field
    01214fb75 <Benjamin Kietzman> add MapType and test its ToString
    47d95efe1 <Benjamin Kietzman> add MapType to Layout.rst
---
 cpp/src/arrow/array-list-test.cc           | 146 +++++++++++++++++++++++++++++
 cpp/src/arrow/array.cc                     |  89 ++++++++++++++++--
 cpp/src/arrow/array.h                      |  39 +++++++-
 cpp/src/arrow/array/builder_nested.cc      |  77 ++++++++++++++-
 cpp/src/arrow/array/builder_nested.h       |  62 +++++++++++-
 cpp/src/arrow/builder.cc                   |  10 ++
 cpp/src/arrow/compare.cc                   |  16 ++++
 cpp/src/arrow/compute/kernels/take.cc      |   4 +
 cpp/src/arrow/ipc/json-integration-test.cc |   4 +-
 cpp/src/arrow/ipc/json-internal.cc         |  56 ++++++++++-
 cpp/src/arrow/ipc/json-simple-test.cc      | 131 +++++++++++++++++++++++++-
 cpp/src/arrow/ipc/json-simple.cc           |  55 +++++++++++
 cpp/src/arrow/ipc/json-test.cc             |   9 ++
 cpp/src/arrow/ipc/metadata-internal.cc     |  26 +++++
 cpp/src/arrow/ipc/test-common.cc           |  17 ++++
 cpp/src/arrow/ipc/test-common.h            |   5 +
 cpp/src/arrow/ipc/writer.cc                |  48 +++++-----
 cpp/src/arrow/pretty_print-test.cc         |  37 ++++++++
 cpp/src/arrow/pretty_print.cc              |  35 +++++++
 cpp/src/arrow/scalar.cc                    |   9 ++
 cpp/src/arrow/scalar.h                     |  11 +++
 cpp/src/arrow/type-test.cc                 |  21 +++++
 cpp/src/arrow/type.cc                      |  24 +++++
 cpp/src/arrow/type.h                       |  39 +++++++-
 cpp/src/arrow/type_fwd.h                   |   5 +
 cpp/src/arrow/type_traits.h                |   8 ++
 cpp/src/arrow/util/checked_cast.h          |   9 ++
 cpp/src/arrow/visitor.cc                   |   3 +
 cpp/src/arrow/visitor.h                    |   3 +
 cpp/src/arrow/visitor_inline.h             |   1 +
 cpp/src/parquet/arrow/writer.cc            |   1 +
 dev/archery/archery/lang/cpp.py            |   1 +
 docs/source/format/Layout.rst              |  89 +++++++++++++++++-
 integration/integration_test.py            |  91 +++++++++++++++++-
 34 files changed, 1134 insertions(+), 47 deletions(-)

diff --git a/cpp/src/arrow/array-list-test.cc b/cpp/src/arrow/array-list-test.cc
index f2909c7..3847b9e 100644
--- a/cpp/src/arrow/array-list-test.cc
+++ b/cpp/src/arrow/array-list-test.cc
@@ -35,6 +35,7 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::checked_pointer_cast;
 
 // ----------------------------------------------------------------------
 // List tests
@@ -341,6 +342,151 @@ TEST_F(TestListArray, TestBuilderPreserveFieleName) {
 }
 
 // ----------------------------------------------------------------------
+// Map tests
+
+class TestMapArray : public TestBuilder {
+ public:
+  void SetUp() {
+    TestBuilder::SetUp();
+
+    key_type_ = utf8();
+    value_type_ = int32();
+    type_ = map(key_type_, value_type_);
+
+    std::unique_ptr<ArrayBuilder> tmp;
+    ASSERT_OK(MakeBuilder(pool_, type_, &tmp));
+    builder_ = checked_pointer_cast<MapBuilder>(std::move(tmp));
+  }
+
+  void Done() {
+    std::shared_ptr<Array> out;
+    FinishAndCheckPadding(builder_.get(), &out);
+    result_ = std::dynamic_pointer_cast<MapArray>(out);
+  }
+
+ protected:
+  std::shared_ptr<DataType> value_type_, key_type_;
+
+  std::shared_ptr<MapBuilder> builder_;
+  std::shared_ptr<MapArray> result_;
+};
+
+TEST_F(TestMapArray, Equality) {
+  auto& kb = checked_cast<StringBuilder&>(*builder_->key_builder());
+  auto& ib = checked_cast<Int32Builder&>(*builder_->item_builder());
+
+  std::shared_ptr<Array> array, equal_array, unequal_array;
+  std::vector<int32_t> equal_offsets = {0, 1, 2, 5, 6, 7, 8, 10};
+  std::vector<util::string_view> equal_keys = {"a", "a", "a", "b", "c",
+                                               "a", "a", "a", "a", "b"};
+  std::vector<int32_t> equal_values = {1, 2, 3, 4, 5, 2, 2, 2, 5, 6};
+  std::vector<int32_t> unequal_offsets = {0, 1, 4, 7};
+  std::vector<util::string_view> unequal_keys = {"a", "a", "b", "c", "a", "b", "c"};
+  std::vector<int32_t> unequal_values = {1, 2, 2, 2, 3, 4, 5};
+
+  // setup two equal arrays
+  for (auto out : {&array, &equal_array}) {
+    ASSERT_OK(builder_->AppendValues(equal_offsets.data(), equal_offsets.size()));
+    for (auto&& key : equal_keys) {
+      ASSERT_OK(kb.Append(key));
+    }
+    ASSERT_OK(ib.AppendValues(equal_values.data(), equal_values.size()));
+    ASSERT_OK(builder_->Finish(out));
+  }
+
+  // now an unequal one
+  ASSERT_OK(builder_->AppendValues(unequal_offsets.data(), unequal_offsets.size()));
+  for (auto&& key : unequal_keys) {
+    ASSERT_OK(kb.Append(key));
+  }
+  ASSERT_OK(ib.AppendValues(unequal_values.data(), unequal_values.size()));
+  ASSERT_OK(builder_->Finish(&unequal_array));
+
+  // Test array equality
+  EXPECT_TRUE(array->Equals(array));
+  EXPECT_TRUE(array->Equals(equal_array));
+  EXPECT_TRUE(equal_array->Equals(array));
+  EXPECT_FALSE(equal_array->Equals(unequal_array));
+  EXPECT_FALSE(unequal_array->Equals(equal_array));
+
+  // Test range equality
+  EXPECT_TRUE(array->RangeEquals(0, 1, 0, unequal_array));
+  EXPECT_FALSE(array->RangeEquals(0, 2, 0, unequal_array));
+  EXPECT_FALSE(array->RangeEquals(1, 2, 1, unequal_array));
+  EXPECT_TRUE(array->RangeEquals(2, 3, 2, unequal_array));
+}
+
+TEST_F(TestMapArray, BuildingIntToInt) {
+  auto type = map(int16(), int16());
+
+  auto expected_keys = ArrayFromJSON(int16(), R"([
+    0, 1, 2, 3, 4, 5,
+    0, 1, 2, 3, 4, 5
+  ])");
+  auto expected_items = ArrayFromJSON(int16(), R"([
+    1,    1,    2,  3,  5,    8,
+    null, null, 0,  1,  null, 2
+  ])");
+  auto expected_offsets = ArrayFromJSON(int32(), "[0, 6, 6, 12, 12]")->data()->buffers[1];
+  auto expected_null_bitmap =
+      ArrayFromJSON(boolean(), "[1, 0, 1, 1]")->data()->buffers[1];
+
+  MapArray expected(type, 4, expected_offsets, expected_keys, expected_items,
+                    expected_null_bitmap, 1, 0);
+
+  auto key_builder = std::make_shared<Int16Builder>();
+  auto item_builder = std::make_shared<Int16Builder>();
+  MapBuilder map_builder(default_memory_pool(), key_builder, item_builder);
+
+  std::shared_ptr<Array> actual;
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(key_builder->AppendValues({0, 1, 2, 3, 4, 5}));
+  ASSERT_OK(item_builder->AppendValues({1, 1, 2, 3, 5, 8}));
+  ASSERT_OK(map_builder.AppendNull());
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(key_builder->AppendValues({0, 1, 2, 3, 4, 5}));
+  ASSERT_OK(item_builder->AppendValues({-1, -1, 0, 1, -1, 2}, {0, 0, 1, 1, 0, 1}));
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(map_builder.Finish(&actual));
+  ASSERT_OK(ValidateArray(*actual));
+
+  ASSERT_ARRAYS_EQUAL(*actual, expected);
+}
+
+TEST_F(TestMapArray, BuildingStringToInt) {
+  auto type = map(utf8(), int32());
+
+  std::vector<int32_t> offsets = {0, 2, 2, 3, 3};
+  auto expected_keys = ArrayFromJSON(utf8(), R"(["joe", "mark", "cap"])");
+  auto expected_values = ArrayFromJSON(int32(), "[0, null, 8]");
+  std::shared_ptr<Buffer> expected_null_bitmap;
+  ASSERT_OK(
+      BitUtil::BytesToBits({1, 0, 1, 1}, default_memory_pool(), &expected_null_bitmap));
+  MapArray expected(type, 4, Buffer::Wrap(offsets), expected_keys, expected_values,
+                    expected_null_bitmap, 1);
+
+  auto key_builder = std::make_shared<StringBuilder>();
+  auto item_builder = std::make_shared<Int32Builder>();
+  MapBuilder map_builder(default_memory_pool(), key_builder, item_builder);
+
+  std::shared_ptr<Array> actual;
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(key_builder->Append("joe"));
+  ASSERT_OK(item_builder->Append(0));
+  ASSERT_OK(key_builder->Append("mark"));
+  ASSERT_OK(item_builder->AppendNull());
+  ASSERT_OK(map_builder.AppendNull());
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(key_builder->Append("cap"));
+  ASSERT_OK(item_builder->Append(8));
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(map_builder.Finish(&actual));
+  ASSERT_OK(ValidateArray(*actual));
+
+  ASSERT_ARRAYS_EQUAL(*actual, expected);
+}
+
+// ----------------------------------------------------------------------
 // FixedSizeList tests
 
 class TestFixedSizeListArray : public TestBuilder {
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 467a43f..05cc520 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -201,10 +201,7 @@ BooleanArray::BooleanArray(int64_t length, const std::shared_ptr<Buffer>& data,
 // ----------------------------------------------------------------------
 // ListArray
 
-ListArray::ListArray(const std::shared_ptr<ArrayData>& data) {
-  DCHECK_EQ(data->type->id(), Type::LIST);
-  SetData(data);
-}
+ListArray::ListArray(const std::shared_ptr<ArrayData>& data) { SetData(data); }
 
 ListArray::ListArray(const std::shared_ptr<DataType>& type, int64_t length,
                      const std::shared_ptr<Buffer>& value_offsets,
@@ -275,6 +272,8 @@ Status ListArray::FromArrays(const Array& offsets, const Array& values, MemoryPo
 void ListArray::SetData(const std::shared_ptr<ArrayData>& data) {
   this->Array::SetData(data);
   DCHECK_EQ(data->buffers.size(), 2);
+  DCHECK(data->type->id() == Type::LIST);
+  list_type_ = checked_cast<const ListType*>(data->type.get());
 
   auto value_offsets = data->buffers[1];
   raw_value_offsets_ = value_offsets == nullptr
@@ -285,10 +284,6 @@ void ListArray::SetData(const std::shared_ptr<ArrayData>& data) {
   values_ = MakeArray(data_->child_data[0]);
 }
 
-const ListType* ListArray::list_type() const {
-  return checked_cast<const ListType*>(data_->type.get());
-}
-
 std::shared_ptr<DataType> ListArray::value_type() const {
   return list_type()->value_type();
 }
@@ -296,6 +291,41 @@ std::shared_ptr<DataType> ListArray::value_type() const {
 std::shared_ptr<Array> ListArray::values() const { return values_; }
 
 // ----------------------------------------------------------------------
+// MapArray
+
+MapArray::MapArray(const std::shared_ptr<ArrayData>& data) { SetData(data); }
+
+MapArray::MapArray(const std::shared_ptr<DataType>& type, int64_t length,
+                   const std::shared_ptr<Buffer>& offsets,
+                   const std::shared_ptr<Array>& keys,
+                   const std::shared_ptr<Array>& values,
+                   const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count,
+                   int64_t offset) {
+  auto pair_data = ArrayData::Make(type->children()[0]->type(), keys->data()->length,
+                                   {nullptr}, {keys->data(), values->data()}, 0, offset);
+  auto map_data = ArrayData::Make(type, length, {null_bitmap, offsets}, {pair_data},
+                                  null_count, offset);
+  SetData(map_data);
+}
+
+void MapArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::MAP);
+  auto pair_data = data->child_data[0];
+  DCHECK_EQ(pair_data->type->id(), Type::STRUCT);
+  DCHECK_EQ(pair_data->null_count, 0);
+  DCHECK_EQ(pair_data->child_data.size(), 2);
+  DCHECK_EQ(pair_data->child_data[0]->null_count, 0);
+
+  auto pair_list_data = data->Copy();
+  pair_list_data->type = list(pair_data->type);
+  this->ListArray::SetData(pair_list_data);
+  data_->type = data->type;
+
+  keys_ = MakeArray(pair_data->child_data[0]);
+  items_ = MakeArray(pair_data->child_data[1]);
+}
+
+// ----------------------------------------------------------------------
 // FixedSizeListArray
 
 FixedSizeListArray::FixedSizeListArray(const std::shared_ptr<ArrayData>& data) {
@@ -904,6 +934,49 @@ struct ValidateVisitor {
     return ValidateOffsets(array);
   }
 
+  Status Visit(const MapArray& array) {
+    if (array.length() < 0) {
+      return Status::Invalid("Length was negative");
+    }
+
+    auto value_offsets = array.value_offsets();
+    if (array.length() && !value_offsets) {
+      return Status::Invalid("value_offsets_ was null");
+    }
+    if (value_offsets->size() / static_cast<int>(sizeof(int32_t)) < array.length()) {
+      return Status::Invalid("offset buffer size (bytes): ", value_offsets->size(),
+                             " isn't large enough for length: ", array.length());
+    }
+
+    if (!array.keys()) {
+      return Status::Invalid("keys was null");
+    }
+    const Status key_valid = ValidateArray(*array.values());
+    if (!key_valid.ok()) {
+      return Status::Invalid("key array invalid: ", key_valid.ToString());
+    }
+
+    if (!array.values()) {
+      return Status::Invalid("values was null");
+    }
+    const Status values_valid = ValidateArray(*array.values());
+    if (!values_valid.ok()) {
+      return Status::Invalid("values array invalid: ", values_valid.ToString());
+    }
+
+    const int32_t last_offset = array.value_offset(array.length());
+    if (array.values()->length() != last_offset) {
+      return Status::Invalid("Final offset invariant not equal to values length: ",
+                             last_offset, "!=", array.values()->length());
+    }
+    if (array.keys()->length() != last_offset) {
+      return Status::Invalid("Final offset invariant not equal to keys length: ",
+                             last_offset, "!=", array.keys()->length());
+    }
+
+    return ValidateOffsets(array);
+  }
+
   Status Visit(const FixedSizeListArray& array) {
     if (array.length() < 0) {
       return Status::Invalid("Length was negative");
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index de8df2b..0de3462 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -500,7 +500,7 @@ class ARROW_EXPORT ListArray : public Array {
   static Status FromArrays(const Array& offsets, const Array& values, MemoryPool* pool,
                            std::shared_ptr<Array>* out);
 
-  const ListType* list_type() const;
+  const ListType* list_type() const { return list_type_; }
 
   /// \brief Return array object containing the list's values
   std::shared_ptr<Array> values() const;
@@ -521,14 +521,51 @@ class ARROW_EXPORT ListArray : public Array {
   }
 
  protected:
+  // this constructor defers SetData to a derived array class
+  ListArray() = default;
   void SetData(const std::shared_ptr<ArrayData>& data);
   const int32_t* raw_value_offsets_;
 
  private:
+  const ListType* list_type_;
   std::shared_ptr<Array> values_;
 };
 
 // ----------------------------------------------------------------------
+// MapArray
+
+/// Concrete Array class for map data
+///
+/// NB: "value" in this context refers to a pair of a key and the correspondint item
+class ARROW_EXPORT MapArray : public ListArray {
+ public:
+  using TypeClass = MapType;
+
+  explicit MapArray(const std::shared_ptr<ArrayData>& data);
+
+  MapArray(const std::shared_ptr<DataType>& type, int64_t length,
+           const std::shared_ptr<Buffer>& value_offsets,
+           const std::shared_ptr<Array>& keys, const std::shared_ptr<Array>& values,
+           const std::shared_ptr<Buffer>& null_bitmap = NULLPTR,
+           int64_t null_count = kUnknownNullCount, int64_t offset = 0);
+
+  const MapType* map_type() const { return map_type_; }
+
+  /// \brief Return array object containing all map keys
+  std::shared_ptr<Array> keys() const { return keys_; }
+
+  /// \brief Return array object containing all mapped items
+  std::shared_ptr<Array> items() const { return items_; }
+
+ protected:
+  void SetData(const std::shared_ptr<ArrayData>& data);
+
+ private:
+  const MapType* map_type_;
+  std::shared_ptr<Array> keys_, items_;
+};
+
+// ----------------------------------------------------------------------
 // FixedSizeListArray
 
 /// Concrete Array class for fixed size list data
diff --git a/cpp/src/arrow/array/builder_nested.cc b/cpp/src/arrow/array/builder_nested.cc
index dd88a7a..309cd2a 100644
--- a/cpp/src/arrow/array/builder_nested.cc
+++ b/cpp/src/arrow/array/builder_nested.cc
@@ -140,9 +140,84 @@ ArrayBuilder* ListBuilder::value_builder() const {
   DCHECK(!values_) << "Using value builder is pointless when values_ is set";
   return value_builder_.get();
 }
+// ----------------------------------------------------------------------
+// MapBuilder
+
+MapBuilder::MapBuilder(MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& key_builder,
+                       std::shared_ptr<ArrayBuilder> const& item_builder,
+                       const std::shared_ptr<DataType>& type)
+    : ArrayBuilder(type, pool), key_builder_(key_builder), item_builder_(item_builder) {
+  list_builder_ = std::make_shared<ListBuilder>(
+      pool, key_builder, list(field("key", key_builder->type(), false)));
+}
+
+MapBuilder::MapBuilder(MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& key_builder,
+                       const std::shared_ptr<ArrayBuilder>& item_builder,
+                       bool keys_sorted)
+    : MapBuilder(pool, key_builder, item_builder,
+                 map(key_builder->type(), item_builder->type(), keys_sorted)) {}
+
+Status MapBuilder::Resize(int64_t capacity) {
+  RETURN_NOT_OK(list_builder_->Resize(capacity));
+  capacity_ = list_builder_->capacity();
+  return Status::OK();
+}
+
+void MapBuilder::Reset() {
+  list_builder_->Reset();
+  ArrayBuilder::Reset();
+}
+
+Status MapBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  DCHECK_EQ(item_builder_->length(), key_builder_->length());
+  // finish list(keys) builder
+  RETURN_NOT_OK(list_builder_->FinishInternal(out));
+  // finish values builder
+  std::shared_ptr<ArrayData> items_data;
+  RETURN_NOT_OK(item_builder_->FinishInternal(&items_data));
+
+  auto keys_data = (*out)->child_data[0];
+  (*out)->type = type_;
+  (*out)->child_data[0] = ArrayData::Make(type_->child(0)->type(), keys_data->length,
+                                          {nullptr}, {keys_data, items_data}, 0, 0);
+  ArrayBuilder::Reset();
+  return Status::OK();
+}
+
+Status MapBuilder::AppendValues(const int32_t* offsets, int64_t length,
+                                const uint8_t* valid_bytes) {
+  DCHECK_EQ(item_builder_->length(), key_builder_->length());
+  RETURN_NOT_OK(list_builder_->AppendValues(offsets, length, valid_bytes));
+  length_ = list_builder_->length();
+  null_count_ = list_builder_->null_count();
+  return Status::OK();
+}
+
+Status MapBuilder::Append() {
+  DCHECK_EQ(item_builder_->length(), key_builder_->length());
+  RETURN_NOT_OK(list_builder_->Append());
+  length_ = list_builder_->length();
+  return Status::OK();
+}
+
+Status MapBuilder::AppendNull() {
+  DCHECK_EQ(item_builder_->length(), key_builder_->length());
+  RETURN_NOT_OK(list_builder_->AppendNull());
+  length_ = list_builder_->length();
+  null_count_ = list_builder_->null_count();
+  return Status::OK();
+}
+
+Status MapBuilder::AppendNulls(int64_t length) {
+  DCHECK_EQ(item_builder_->length(), key_builder_->length());
+  RETURN_NOT_OK(list_builder_->AppendNulls(length));
+  length_ = list_builder_->length();
+  null_count_ = list_builder_->null_count();
+  return Status::OK();
+}
 
 // ----------------------------------------------------------------------
-// ListBuilder
+// FixedSizeListBuilder
 
 FixedSizeListBuilder::FixedSizeListBuilder(
     MemoryPool* pool, std::shared_ptr<ArrayBuilder> const& value_builder,
diff --git a/cpp/src/arrow/array/builder_nested.h b/cpp/src/arrow/array/builder_nested.h
index d3695e5..de03145 100644
--- a/cpp/src/arrow/array/builder_nested.h
+++ b/cpp/src/arrow/array/builder_nested.h
@@ -45,7 +45,7 @@ class ARROW_EXPORT ListBuilder : public ArrayBuilder {
  public:
   /// Use this constructor to incrementally build the value array along with offsets and
   /// null bitmap.
-  ListBuilder(MemoryPool* pool, std::shared_ptr<ArrayBuilder> const& value_builder,
+  ListBuilder(MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& value_builder,
               const std::shared_ptr<DataType>& type = NULLPTR);
 
   Status Resize(int64_t capacity) override;
@@ -88,6 +88,66 @@ class ARROW_EXPORT ListBuilder : public ArrayBuilder {
 };
 
 // ----------------------------------------------------------------------
+// Map builder
+
+/// \class MapBuilder
+/// \brief Builder class for arrays of variable-size maps
+///
+/// To use this class, you must append values to the key and item array builders
+/// and use the Append function to delimit each distinct map (once the keys and items
+/// have been appended) or use the bulk API to append a sequence of offests and null
+/// maps.
+///
+/// Key uniqueness and ordering are not validated.
+class ARROW_EXPORT MapBuilder : public ArrayBuilder {
+ public:
+  /// Use this constructor to incrementally build the key and item arrays along with
+  /// offsets and null bitmap.
+  MapBuilder(MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& key_builder,
+             const std::shared_ptr<ArrayBuilder>& item_builder,
+             const std::shared_ptr<DataType>& type);
+
+  /// Derive built type from key and item builders' types
+  MapBuilder(MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& key_builder,
+             const std::shared_ptr<ArrayBuilder>& item_builder, bool keys_sorted = false);
+
+  Status Resize(int64_t capacity) override;
+  void Reset() override;
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+  /// \cond FALSE
+  using ArrayBuilder::Finish;
+  /// \endcond
+
+  Status Finish(std::shared_ptr<MapArray>* out) { return FinishTyped(out); }
+
+  /// \brief Vector append
+  ///
+  /// If passed, valid_bytes is of equal length to values, and any zero byte
+  /// will be considered as a null for that slot
+  Status AppendValues(const int32_t* offsets, int64_t length,
+                      const uint8_t* valid_bytes = NULLPTR);
+
+  /// \brief Start a new variable-length map slot
+  ///
+  /// This function should be called before beginning to append elements to the
+  /// key and value builders
+  Status Append();
+
+  Status AppendNull() final;
+
+  Status AppendNulls(int64_t length) final;
+
+  ArrayBuilder* key_builder() const { return key_builder_.get(); }
+  ArrayBuilder* item_builder() const { return item_builder_.get(); }
+
+ protected:
+  std::shared_ptr<ListBuilder> list_builder_;
+  std::shared_ptr<ArrayBuilder> key_builder_;
+  std::shared_ptr<ArrayBuilder> item_builder_;
+};
+
+// ----------------------------------------------------------------------
 // FixedSizeList builder
 
 /// \class FixedSizeListBuilder
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 2a3a1ad..f6f8042 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -134,6 +134,16 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
       out->reset(new ListBuilder(pool, std::move(value_builder), type));
       return Status::OK();
     }
+    case Type::MAP: {
+      const auto& map_type = internal::checked_cast<const MapType&>(*type);
+      std::unique_ptr<ArrayBuilder> key_builder, item_builder;
+      RETURN_NOT_OK(MakeBuilder(pool, map_type.key_type(), &key_builder));
+      RETURN_NOT_OK(MakeBuilder(pool, map_type.item_type(), &item_builder));
+      out->reset(
+          new MapBuilder(pool, std::move(key_builder), std::move(item_builder), type));
+      return Status::OK();
+    }
+
     case Type::FIXED_SIZE_LIST: {
       std::unique_ptr<ArrayBuilder> value_builder;
       std::shared_ptr<DataType> value_type =
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index c82d4df..ca4dfee 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -760,6 +760,15 @@ class TypeEqualsVisitor {
 
   Status Visit(const ListType& left) { return VisitChildren(left); }
 
+  Status Visit(const MapType& left) {
+    const auto& right = checked_cast<const MapType&>(right_);
+    if (left.keys_sorted() != right.keys_sorted()) {
+      result_ = false;
+      return Status::OK();
+    }
+    return VisitChildren(left);
+  }
+
   Status Visit(const FixedSizeListType& left) { return VisitChildren(left); }
 
   Status Visit(const StructType& left) { return VisitChildren(left); }
@@ -854,6 +863,13 @@ class ScalarEqualsVisitor {
     return Status::OK();
   }
 
+  Status Visit(const MapScalar& left) {
+    const auto& right = checked_cast<const MapScalar&>(right_);
+    result_ = internal::SharedPtrEquals(left.keys, right.keys) &&
+              internal::SharedPtrEquals(left.items, right.items);
+    return Status::OK();
+  }
+
   Status Visit(const FixedSizeListScalar& left) {
     const auto& right = checked_cast<const FixedSizeListScalar&>(right_);
     result_ = internal::SharedPtrEquals(left.value, right.value);
diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc
index f83139d..9af2c0c 100644
--- a/cpp/src/arrow/compute/kernels/take.cc
+++ b/cpp/src/arrow/compute/kernels/take.cc
@@ -176,6 +176,10 @@ struct UnpackValues {
     return Status::NotImplemented("gathering values of type ", t);
   }
 
+  Status Visit(const MapType& t) {
+    return Status::NotImplemented("gathering values of type ", t);
+  }
+
   Status Visit(const FixedSizeListType& t) {
     return Status::NotImplemented("gathering values of type ", t);
   }
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index a735b6c..0bce0fd 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -79,7 +79,7 @@ static Status ConvertJsonToArrow(const std::string& json_path,
   RETURN_NOT_OK(internal::json::JsonReader::Open(json_buffer, &reader));
 
   if (FLAGS_verbose) {
-    std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
+    std::cout << "Found schema:\n" << reader->schema()->ToString() << std::endl;
   }
 
   std::shared_ptr<RecordBatchWriter> writer;
@@ -106,7 +106,7 @@ static Status ConvertArrowToJson(const std::string& arrow_path,
   RETURN_NOT_OK(RecordBatchFileReader::Open(in_file.get(), &reader));
 
   if (FLAGS_verbose) {
-    std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
+    std::cout << "Found schema:\n" << reader->schema()->ToString() << std::endl;
   }
 
   std::unique_ptr<internal::json::JsonWriter> writer;
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index e7fd4a0..42663c0 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -170,6 +170,11 @@ class SchemaWriter {
                           void>::type
   WriteTypeMetadata(const T& type) {}
 
+  void WriteTypeMetadata(const MapType& type) {
+    writer_->Key("keysSorted");
+    writer_->Int(type.keys_sorted());
+  }
+
   void WriteTypeMetadata(const IntegerType& type) {
     writer_->Key("bitWidth");
     writer_->Int(type.bit_width());
@@ -325,6 +330,11 @@ class SchemaWriter {
     return Status::OK();
   }
 
+  Status Visit(const MapType& type) {
+    WriteName("map", type);
+    return Status::OK();
+  }
+
   Status Visit(const FixedSizeListType& type) {
     WriteName("fixedsizelist", type);
     return Status::OK();
@@ -682,6 +692,28 @@ static Status GetFloatingPoint(const RjObject& json_type,
   return Status::OK();
 }
 
+static Status GetMap(const RjObject& json_type,
+                     const std::vector<std::shared_ptr<Field>>& children,
+                     std::shared_ptr<DataType>* type) {
+  if (children.size() != 1) {
+    return Status::Invalid("Map must have exactly one child");
+  }
+
+  if (children[0]->type()->id() != Type::STRUCT ||
+      children[0]->type()->num_children() != 2) {
+    return Status::Invalid("Map's key-item pairs must be structs");
+  }
+
+  const auto& it_keys_sorted = json_type.FindMember("keysSorted");
+  RETURN_NOT_BOOL("keysSorted", it_keys_sorted, json_type);
+
+  auto pair_children = children[0]->type()->children();
+
+  bool keys_sorted = it_keys_sorted->value.GetBool();
+  *type = map(pair_children[0]->type(), pair_children[1]->type(), keys_sorted);
+  return Status::OK();
+}
+
 static Status GetFixedSizeBinary(const RjObject& json_type,
                                  std::shared_ptr<DataType>* type) {
   const auto& it_byte_width = json_type.FindMember("byteWidth");
@@ -900,6 +932,8 @@ static Status GetType(const RjObject& json_type,
       return Status::Invalid("List must have exactly one child");
     }
     *type = list(children[0]);
+  } else if (type_name == "map") {
+    return GetMap(json_type, children, type);
   } else if (type_name == "fixedsizelist") {
     return GetFixedSizeList(json_type, children, type);
   } else if (type_name == "struct") {
@@ -1216,7 +1250,7 @@ class ArrayReader {
     return Status::OK();
   }
 
-  Status Visit(const ListType& type) {
+  Status CreateList(const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) {
     int32_t null_count = 0;
     std::shared_ptr<Buffer> validity_buffer;
     RETURN_NOT_OK(GetValidityBuffer(is_valid_, &null_count, &validity_buffer));
@@ -1228,12 +1262,26 @@ class ArrayReader {
                                        &offsets_buffer));
 
     std::vector<std::shared_ptr<Array>> children;
-    RETURN_NOT_OK(GetChildren(obj_, type, &children));
+    RETURN_NOT_OK(GetChildren(obj_, *type, &children));
     DCHECK_EQ(children.size(), 1);
 
-    result_ = std::make_shared<ListArray>(type_, length_, offsets_buffer, children[0],
-                                          validity_buffer, null_count);
+    out->reset(new ListArray(type, length_, offsets_buffer, children[0], validity_buffer,
+                             null_count));
+    return Status::OK();
+  }
+
+  Status Visit(const ListType& type) { return CreateList(type_, &result_); }
 
+  Status Visit(const MapType& type) {
+    auto list_type = std::make_shared<ListType>(field(
+        "item",
+        struct_({field("key", type.key_type(), false), field("item", type.item_type())}),
+        false));
+    std::shared_ptr<Array> list_array;
+    RETURN_NOT_OK(CreateList(list_type, &list_array));
+    auto map_data = list_array->data();
+    map_data->type = type_;
+    result_ = std::make_shared<MapArray>(map_data);
     return Status::OK();
   }
 
diff --git a/cpp/src/arrow/ipc/json-simple-test.cc b/cpp/src/arrow/ipc/json-simple-test.cc
index 0b46517..f1d487f 100644
--- a/cpp/src/arrow/ipc/json-simple-test.cc
+++ b/cpp/src/arrow/ipc/json-simple-test.cc
@@ -47,6 +47,7 @@ namespace internal {
 namespace json {
 
 using ::arrow::internal::checked_cast;
+using ::arrow::internal::checked_pointer_cast;
 
 // Avoid undefined behaviour on signed overflow
 template <typename Signed>
@@ -535,9 +536,137 @@ TEST(TestList, IntegerListList) {
   }
 }
 
+TEST(TestMap, IntegerToInteger) {
+  auto type = map(int16(), int16());
+  std::shared_ptr<Array> expected, actual;
+
+  ASSERT_OK(ArrayFromJSON(type, R"([
+    [[0, 1], [1, 1], [2, 2], [3, 3], [4, 5], [5, 8]],
+    null,
+    [[0, null], [1, null], [2, 0], [3, 1], [4, null], [5, 2]],
+    []
+  ])",
+                          &actual));
+
+  std::unique_ptr<ArrayBuilder> builder;
+  ASSERT_OK(MakeBuilder(default_memory_pool(), type, &builder));
+  auto& map_builder = checked_cast<MapBuilder&>(*builder);
+  auto& key_builder = checked_cast<Int16Builder&>(*map_builder.key_builder());
+  auto& item_builder = checked_cast<Int16Builder&>(*map_builder.item_builder());
+
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(key_builder.AppendValues({0, 1, 2, 3, 4, 5}));
+  ASSERT_OK(item_builder.AppendValues({1, 1, 2, 3, 5, 8}));
+  ASSERT_OK(map_builder.AppendNull());
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(key_builder.AppendValues({0, 1, 2, 3, 4, 5}));
+  ASSERT_OK(item_builder.AppendValues({-1, -1, 0, 1, -1, 2}, {0, 0, 1, 1, 0, 1}));
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(map_builder.Finish(&expected));
+
+  ASSERT_ARRAYS_EQUAL(*actual, *expected);
+}
+
+TEST(TestMap, StringToInteger) {
+  auto type = map(utf8(), int32());
+  auto actual = ArrayFromJSON(type, R"([
+    [["joe", 0], ["mark", null]],
+    null,
+    [["cap", 8]],
+    []
+  ])");
+  std::vector<int32_t> offsets = {0, 2, 2, 3, 3};
+  auto expected_keys = ArrayFromJSON(utf8(), R"(["joe", "mark", "cap"])");
+  auto expected_values = ArrayFromJSON(int32(), "[0, null, 8]");
+  std::shared_ptr<Buffer> expected_null_bitmap;
+  ASSERT_OK(
+      BitUtil::BytesToBits({1, 0, 1, 1}, default_memory_pool(), &expected_null_bitmap));
+  auto expected =
+      std::make_shared<MapArray>(type, 4, Buffer::Wrap(offsets), expected_keys,
+                                 expected_values, expected_null_bitmap, 1);
+  ASSERT_ARRAYS_EQUAL(*actual, *expected);
+}
+
+TEST(TestMap, Errors) {
+  auto type = map(int16(), int16());
+  std::shared_ptr<Array> array;
+
+  // list of pairs isn't an array
+  ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[0]", &array));
+  // pair isn't an array
+  ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[0]]", &array));
+  ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[null]]", &array));
+  // pair with length != 2
+  ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[[0]]]", &array));
+  ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[[0, 1, 2]]]", &array));
+  // null key
+  ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[[null, 0]]]", &array));
+  // key or value fails to convert
+  ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[[0.0, 0]]]", &array));
+  ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[[0, 0.0]]]", &array));
+}
+
+TEST(TestMap, IntegerMapToStringList) {
+  auto type = map(map(int16(), int16()), list(utf8()));
+  std::shared_ptr<Array> expected, actual;
+
+  ASSERT_OK(ArrayFromJSON(type, R"([
+    [
+      [
+        [],
+        [null, "empty"]
+      ],
+      [
+        [[0, 1]],
+        null
+      ],
+      [
+        [[0, 0], [1, 1]],
+        ["bootstrapping tautology?", "lispy", null, "i can see eternity"]
+      ]
+    ],
+    null
+  ])",
+                          &actual));
+
+  std::unique_ptr<ArrayBuilder> builder;
+  ASSERT_OK(MakeBuilder(default_memory_pool(), type, &builder));
+  auto& map_builder = checked_cast<MapBuilder&>(*builder);
+  auto& key_builder = checked_cast<MapBuilder&>(*map_builder.key_builder());
+  auto& key_key_builder = checked_cast<Int16Builder&>(*key_builder.key_builder());
+  auto& key_item_builder = checked_cast<Int16Builder&>(*key_builder.item_builder());
+  auto& item_builder = checked_cast<ListBuilder&>(*map_builder.item_builder());
+  auto& item_value_builder = checked_cast<StringBuilder&>(*item_builder.value_builder());
+
+  ASSERT_OK(map_builder.Append());
+  ASSERT_OK(key_builder.Append());
+  ASSERT_OK(item_builder.Append());
+  ASSERT_OK(item_value_builder.AppendNull());
+  ASSERT_OK(item_value_builder.Append("empty"));
+
+  ASSERT_OK(key_builder.Append());
+  ASSERT_OK(item_builder.AppendNull());
+  ASSERT_OK(key_key_builder.AppendValues({0}));
+  ASSERT_OK(key_item_builder.AppendValues({1}));
+
+  ASSERT_OK(key_builder.Append());
+  ASSERT_OK(item_builder.Append());
+  ASSERT_OK(key_key_builder.AppendValues({0, 1}));
+  ASSERT_OK(key_item_builder.AppendValues({0, 1}));
+  ASSERT_OK(item_value_builder.Append("bootstrapping tautology?"));
+  ASSERT_OK(item_value_builder.Append("lispy"));
+  ASSERT_OK(item_value_builder.AppendNull());
+  ASSERT_OK(item_value_builder.Append("i can see eternity"));
+
+  ASSERT_OK(map_builder.AppendNull());
+
+  ASSERT_OK(map_builder.Finish(&expected));
+  ASSERT_ARRAYS_EQUAL(*actual, *expected);
+}
+
 TEST(TestFixedSizeList, IntegerList) {
   auto pool = default_memory_pool();
-  std::shared_ptr<DataType> type = fixed_size_list(int64(), 2);
+  auto type = fixed_size_list(int64(), 2);
   std::shared_ptr<Array> values, expected, actual;
 
   ASSERT_OK(ArrayFromJSON(type, "[]", &actual));
diff --git a/cpp/src/arrow/ipc/json-simple.cc b/cpp/src/arrow/ipc/json-simple.cc
index c9d238d..f850f3d 100644
--- a/cpp/src/arrow/ipc/json-simple.cc
+++ b/cpp/src/arrow/ipc/json-simple.cc
@@ -442,6 +442,60 @@ class ListConverter final : public ConcreteConverter<ListConverter> {
 };
 
 // ------------------------------------------------------------------------
+// Converter for map arrays
+
+class MapConverter final : public ConcreteConverter<MapConverter> {
+ public:
+  explicit MapConverter(const std::shared_ptr<DataType>& type) { type_ = type; }
+
+  Status Init() override {
+    const auto& map_type = checked_cast<const MapType&>(*type_);
+    RETURN_NOT_OK(GetConverter(map_type.key_type(), &key_converter_));
+    RETURN_NOT_OK(GetConverter(map_type.item_type(), &item_converter_));
+    auto key_builder = key_converter_->builder();
+    auto item_builder = item_converter_->builder();
+    builder_ = std::make_shared<MapBuilder>(default_memory_pool(), key_builder,
+                                            item_builder, type_);
+    return Status::OK();
+  }
+
+  Status AppendNull() override { return builder_->AppendNull(); }
+
+  Status AppendValue(const rj::Value& json_obj) override {
+    if (json_obj.IsNull()) {
+      return AppendNull();
+    }
+    RETURN_NOT_OK(builder_->Append());
+    if (!json_obj.IsArray()) {
+      return JSONTypeError("array", json_obj.GetType());
+    }
+    auto size = json_obj.Size();
+    for (uint32_t i = 0; i < size; ++i) {
+      const auto& json_pair = json_obj[i];
+      if (!json_pair.IsArray()) {
+        return JSONTypeError("array", json_pair.GetType());
+      }
+      if (json_pair.Size() != 2) {
+        return Status::Invalid("key item pair must have exactly two elements, had ",
+                               json_pair.Size());
+      }
+      if (json_pair[0].IsNull()) {
+        return Status::Invalid("null key is invalid");
+      }
+      RETURN_NOT_OK(key_converter_->AppendValue(json_pair[0]));
+      RETURN_NOT_OK(item_converter_->AppendValue(json_pair[1]));
+    }
+    return Status::OK();
+  }
+
+  std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+  std::shared_ptr<MapBuilder> builder_;
+  std::shared_ptr<Converter> key_converter_, item_converter_;
+};
+
+// ------------------------------------------------------------------------
 // Converter for fixed size list arrays
 
 class FixedSizeListConverter final : public ConcreteConverter<FixedSizeListConverter> {
@@ -587,6 +641,7 @@ Status GetConverter(const std::shared_ptr<DataType>& type,
     SIMPLE_CONVERTER_CASE(Type::FLOAT, FloatConverter<FloatType>)
     SIMPLE_CONVERTER_CASE(Type::DOUBLE, FloatConverter<DoubleType>)
     SIMPLE_CONVERTER_CASE(Type::LIST, ListConverter)
+    SIMPLE_CONVERTER_CASE(Type::MAP, MapConverter)
     SIMPLE_CONVERTER_CASE(Type::FIXED_SIZE_LIST, FixedSizeListConverter)
     SIMPLE_CONVERTER_CASE(Type::STRUCT, StructConverter)
     SIMPLE_CONVERTER_CASE(Type::STRING, StringConverter)
diff --git a/cpp/src/arrow/ipc/json-test.cc b/cpp/src/arrow/ipc/json-test.cc
index b21e430..fb57fa7 100644
--- a/cpp/src/arrow/ipc/json-test.cc
+++ b/cpp/src/arrow/ipc/json-test.cc
@@ -204,6 +204,15 @@ TEST(TestJsonArrayWriter, NestedTypes) {
 
   TestArrayRoundTrip(list_array);
 
+  // List
+  auto map_type = map(utf8(), int32());
+  auto keys_array = ArrayFromJSON(utf8(), R"(["a", "b", "c", "d", "a", "b", "c"])");
+
+  MapArray map_array(map_type, 5, offsets_buffer, keys_array, values_array, list_bitmap,
+                     1);
+
+  TestArrayRoundTrip(map_array);
+
   // FixedSizeList
   FixedSizeListArray fixed_size_list_array(fixed_size_list(value_type, 2), 3,
                                            values_array->Slice(1), list_bitmap, 1);
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 676a477..13eb334 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -316,6 +316,25 @@ Status ConcreteTypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
       }
       *out = std::make_shared<ListType>(children[0]);
       return Status::OK();
+    case flatbuf::Type_Map:
+      if (children.size() != 1) {
+        return Status::Invalid("Map must have exactly 1 child field");
+      }
+      if (  // FIXME(bkietz) temporarily disabled: this field is sometimes read nullable
+            // children[0]->nullable() ||
+          children[0]->type()->id() != Type::STRUCT ||
+          children[0]->type()->num_children() != 2) {
+        return Status::Invalid("Map's key-item pairs must be non-nullable structs");
+      }
+      if (children[0]->type()->child(0)->nullable()) {
+        return Status::Invalid("Map's keys must be non-nullable");
+      } else {
+        auto map = static_cast<const flatbuf::Map*>(type_data);
+        *out = std::make_shared<MapType>(children[0]->type()->child(0)->type(),
+                                         children[0]->type()->child(1)->type(),
+                                         map->keysSorted());
+      }
+      return Status::OK();
     case flatbuf::Type_FixedSizeList:
       if (children.size() != 1) {
         return Status::Invalid("FixedSizeList must have exactly 1 child field");
@@ -601,6 +620,13 @@ class FieldToFlatbufferVisitor {
     return Status::OK();
   }
 
+  Status Visit(const MapType& type) {
+    fb_type_ = flatbuf::Type_Map;
+    RETURN_NOT_OK(AppendChildFields(fbb_, type, &children_, dictionary_memo_));
+    type_offset_ = flatbuf::CreateMap(fbb_, type.keys_sorted()).Union();
+    return Status::OK();
+  }
+
   Status Visit(const FixedSizeListType& type) {
     fb_type_ = flatbuf::Type_FixedSizeList;
     RETURN_NOT_OK(AppendChildFields(fbb_, type, &children_, dictionary_memo_));
diff --git a/cpp/src/arrow/ipc/test-common.cc b/cpp/src/arrow/ipc/test-common.cc
index abf27a1..12adebc 100644
--- a/cpp/src/arrow/ipc/test-common.cc
+++ b/cpp/src/arrow/ipc/test-common.cc
@@ -115,6 +115,23 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li
   return ValidateArray(**out);
 }
 
+Status MakeRandomMapArray(const std::shared_ptr<Array>& key_array,
+                          const std::shared_ptr<Array>& item_array, int num_maps,
+                          bool include_nulls, MemoryPool* pool,
+                          std::shared_ptr<Array>* out) {
+  auto pair_type = struct_(
+      {field("key", key_array->type(), false), field("item", item_array->type())});
+
+  auto pair_array = std::make_shared<StructArray>(pair_type, num_maps,
+                                                  ArrayVector{key_array, item_array});
+
+  RETURN_NOT_OK(MakeRandomListArray(pair_array, num_maps, include_nulls, pool, out));
+  auto map_data = (*out)->data();
+  map_data->type = map(key_array->type(), item_array->type());
+  out->reset(new MapArray(map_data));
+  return Status::OK();
+}
+
 Status MakeRandomBooleanArray(const int length, bool include_nulls,
                               std::shared_ptr<Array>* out) {
   std::vector<uint8_t> values(length);
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index adbc57b..0ec9834 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -49,6 +49,11 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li
                            std::shared_ptr<Array>* out);
 
 ARROW_EXPORT
+Status MakeRandomMapArray(const std::shared_ptr<Array>& child_array, int num_lists,
+                          bool include_nulls, MemoryPool* pool,
+                          std::shared_ptr<Array>* out);
+
+ARROW_EXPORT
 Status MakeRandomBooleanArray(const int length, bool include_nulls,
                               std::shared_ptr<Array>* out);
 
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index d7d129e..8917410 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -274,6 +274,30 @@ class RecordBatchSerializer : public ArrayVisitor {
     return Status::OK();
   }
 
+  Status VisitList(const ListArray& array) {
+    std::shared_ptr<Buffer> value_offsets;
+    RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
+    out_->body_buffers.emplace_back(value_offsets);
+
+    --max_recursion_depth_;
+    std::shared_ptr<Array> values = array.values();
+
+    int32_t values_offset = 0;
+    int32_t values_length = 0;
+    if (value_offsets) {
+      values_offset = array.value_offset(0);
+      values_length = array.value_offset(array.length()) - values_offset;
+    }
+
+    if (array.offset() != 0 || values_length < values->length()) {
+      // Must also slice the values
+      values = values->Slice(values_offset, values_length);
+    }
+    RETURN_NOT_OK(VisitArray(*values));
+    ++max_recursion_depth_;
+    return Status::OK();
+  }
+
   Status Visit(const BooleanArray& array) override {
     std::shared_ptr<Buffer> data;
     RETURN_NOT_OK(
@@ -318,29 +342,9 @@ class RecordBatchSerializer : public ArrayVisitor {
 
   Status Visit(const BinaryArray& array) override { return VisitBinary(array); }
 
-  Status Visit(const ListArray& array) override {
-    std::shared_ptr<Buffer> value_offsets;
-    RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
-    out_->body_buffers.emplace_back(value_offsets);
-
-    --max_recursion_depth_;
-    std::shared_ptr<Array> values = array.values();
-
-    int32_t values_offset = 0;
-    int32_t values_length = 0;
-    if (value_offsets) {
-      values_offset = array.value_offset(0);
-      values_length = array.value_offset(array.length()) - values_offset;
-    }
+  Status Visit(const ListArray& array) override { return VisitList(array); }
 
-    if (array.offset() != 0 || values_length < values->length()) {
-      // Must also slice the values
-      values = values->Slice(values_offset, values_length);
-    }
-    RETURN_NOT_OK(VisitArray(*values));
-    ++max_recursion_depth_;
-    return Status::OK();
-  }
+  Status Visit(const MapArray& array) override { return VisitList(array); }
 
   Status Visit(const StructArray& array) override {
     --max_recursion_depth_;
diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc
index fd8e093..8171798 100644
--- a/cpp/src/arrow/pretty_print-test.cc
+++ b/cpp/src/arrow/pretty_print-test.cc
@@ -355,6 +355,43 @@ TEST_F(TestPrettyPrint, ListType) {
   CheckStream(*array, {0, 1}, ex_3);
 }
 
+TEST_F(TestPrettyPrint, MapType) {
+  auto map_type = map(utf8(), int64());
+  auto array = ArrayFromJSON(map_type, R"([
+    [["joe", 0], ["mark", null]],
+    null,
+    [["cap", 8]],
+    []
+  ])");
+
+  static const char* ex = R"expected([
+  keys:
+  [
+    "joe",
+    "mark"
+  ]
+  values:
+  [
+    0,
+    null
+  ],
+  null,
+  keys:
+  [
+    "cap"
+  ]
+  values:
+  [
+    8
+  ],
+  keys:
+  []
+  values:
+  []
+])expected";
+  CheckArray(*array, {0, 10}, ex);
+}
+
 TEST_F(TestPrettyPrint, FixedSizeListType) {
   auto list_type = fixed_size_list(int32(), 3);
   auto array = ArrayFromJSON(list_type,
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index 5c6f870..695abc1 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -269,6 +269,40 @@ class ArrayPrinter : public PrettyPrinter {
     return Status::OK();
   }
 
+  Status WriteDataValues(const MapArray& array) {
+    bool skip_comma = true;
+    for (int64_t i = 0; i < array.length(); ++i) {
+      if (skip_comma) {
+        skip_comma = false;
+      } else {
+        (*sink_) << ",\n";
+      }
+      if ((i >= window_) && (i < (array.length() - window_))) {
+        Indent();
+        (*sink_) << "...\n";
+        i = array.length() - window_ - 1;
+        skip_comma = true;
+      } else if (array.IsNull(i)) {
+        Indent();
+        (*sink_) << null_rep_;
+      } else {
+        Indent();
+        (*sink_) << "keys:\n";
+        auto keys_slice =
+            array.keys()->Slice(array.value_offset(i), array.value_length(i));
+        RETURN_NOT_OK(PrettyPrint(*keys_slice, {indent_, window_}, sink_));
+        (*sink_) << "\n";
+        Indent();
+        (*sink_) << "values:\n";
+        auto values_slice =
+            array.items()->Slice(array.value_offset(i), array.value_length(i));
+        RETURN_NOT_OK(PrettyPrint(*values_slice, {indent_, window_}, sink_));
+      }
+    }
+    (*sink_) << "\n";
+    return Status::OK();
+  }
+
   Status Visit(const NullArray& array) {
     (*sink_) << array.length() << " nulls";
     return Status::OK();
@@ -279,6 +313,7 @@ class ArrayPrinter : public PrettyPrinter {
                               std::is_base_of<FixedSizeBinaryArray, T>::value ||
                               std::is_base_of<BinaryArray, T>::value ||
                               std::is_base_of<ListArray, T>::value ||
+                              std::is_base_of<MapArray, T>::value ||
                               std::is_base_of<FixedSizeListArray, T>::value,
                           Status>::type
   Visit(const T& array) {
diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc
index 56c7a35..4bc9b92 100644
--- a/cpp/src/arrow/scalar.cc
+++ b/cpp/src/arrow/scalar.cc
@@ -69,6 +69,15 @@ ListScalar::ListScalar(const std::shared_ptr<Array>& value,
 ListScalar::ListScalar(const std::shared_ptr<Array>& value, bool is_valid)
     : ListScalar(value, value->type(), is_valid) {}
 
+MapScalar::MapScalar(const std::shared_ptr<Array>& keys,
+                     const std::shared_ptr<Array>& items,
+                     const std::shared_ptr<DataType>& type, bool is_valid)
+    : Scalar{type, is_valid}, keys(keys), items(items) {}
+
+MapScalar::MapScalar(const std::shared_ptr<Array>& keys,
+                     const std::shared_ptr<Array>& values, bool is_valid)
+    : MapScalar(keys, values, map(keys->type(), values->type()), is_valid) {}
+
 FixedSizeListScalar::FixedSizeListScalar(const std::shared_ptr<Array>& value,
                                          const std::shared_ptr<DataType>& type,
                                          bool is_valid)
diff --git a/cpp/src/arrow/scalar.h b/cpp/src/arrow/scalar.h
index 51b5e71..856660e 100644
--- a/cpp/src/arrow/scalar.h
+++ b/cpp/src/arrow/scalar.h
@@ -179,6 +179,17 @@ struct ARROW_EXPORT ListScalar : public Scalar {
   explicit ListScalar(const std::shared_ptr<Array>& value, bool is_valid = true);
 };
 
+struct ARROW_EXPORT MapScalar : public Scalar {
+  std::shared_ptr<Array> keys;
+  std::shared_ptr<Array> items;
+
+  MapScalar(const std::shared_ptr<Array>& keys, const std::shared_ptr<Array>& values,
+            const std::shared_ptr<DataType>& type, bool is_valid = true);
+
+  MapScalar(const std::shared_ptr<Array>& keys, const std::shared_ptr<Array>& values,
+            bool is_valid = true);
+};
+
 struct ARROW_EXPORT FixedSizeListScalar : public Scalar {
   std::shared_ptr<Array> value;
 
diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc
index 94be608..91562ee 100644
--- a/cpp/src/arrow/type-test.cc
+++ b/cpp/src/arrow/type-test.cc
@@ -370,6 +370,27 @@ TEST(TestListType, Basics) {
   ASSERT_EQ("list<item: list<item: string>>", lt2.ToString());
 }
 
+TEST(TestMapType, Basics) {
+  std::shared_ptr<DataType> kt = std::make_shared<StringType>();
+  std::shared_ptr<DataType> it = std::make_shared<UInt8Type>();
+
+  MapType map_type(kt, it);
+  ASSERT_EQ(map_type.id(), Type::MAP);
+
+  ASSERT_EQ("map", map_type.name());
+  ASSERT_EQ("map<string, uint8>", map_type.ToString());
+
+  ASSERT_EQ(map_type.key_type()->id(), kt->id());
+  ASSERT_EQ(map_type.item_type()->id(), it->id());
+  ASSERT_EQ(map_type.value_type()->id(), Type::STRUCT);
+
+  std::shared_ptr<DataType> mt = std::make_shared<MapType>(it, kt);
+  ASSERT_EQ("map<uint8, string>", mt->ToString());
+
+  MapType mt2(kt, mt, true);
+  ASSERT_EQ("map<string, map<uint8, string>, keys_sorted>", mt2.ToString());
+}
+
 TEST(TestFixedSizeListType, Basics) {
   std::shared_ptr<DataType> vt = std::make_shared<UInt8Type>();
 
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index bc54873..d2105a6 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -147,6 +147,24 @@ std::string ListType::ToString() const {
   return s.str();
 }
 
+MapType::MapType(const std::shared_ptr<DataType>& key_type,
+                 const std::shared_ptr<DataType>& item_type, bool keys_sorted)
+    : ListType(struct_({std::make_shared<Field>("key", key_type, false),
+                        std::make_shared<Field>("item", item_type)})),
+      keys_sorted_(keys_sorted) {
+  id_ = type_id;
+}
+
+std::string MapType::ToString() const {
+  std::stringstream s;
+  s << "map<" << key_type()->ToString() << ", " << item_type()->ToString();
+  if (keys_sorted_) {
+    s << ", keys_sorted";
+  }
+  s << ">";
+  return s.str();
+}
+
 std::string FixedSizeListType::ToString() const {
   std::stringstream s;
   s << "fixed_size_list<" << value_field()->ToString() << ">[" << list_size_ << "]";
@@ -663,6 +681,12 @@ std::shared_ptr<DataType> list(const std::shared_ptr<Field>& value_field) {
   return std::make_shared<ListType>(value_field);
 }
 
+std::shared_ptr<DataType> map(const std::shared_ptr<DataType>& key_type,
+                              const std::shared_ptr<DataType>& value_type,
+                              bool keys_sorted) {
+  return std::make_shared<MapType>(key_type, value_type, keys_sorted);
+}
+
 std::shared_ptr<DataType> fixed_size_list(const std::shared_ptr<DataType>& value_type,
                                           int32_t list_size) {
   return std::make_shared<FixedSizeListType>(value_type, list_size);
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index b5eef6f..2ef1c06 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -461,18 +461,43 @@ class ARROW_EXPORT ListType : public NestedType {
   std::string name() const override { return "list"; }
 };
 
+/// \brief Concrete type class for map data
+///
+/// Map data is nested data where each value is a variable number of
+/// key-item pairs.  Maps can be recursively nested, for example
+/// map(utf8, map(utf8, int32)).
+class ARROW_EXPORT MapType : public ListType {
+ public:
+  static constexpr Type::type type_id = Type::MAP;
+
+  MapType(const std::shared_ptr<DataType>& key_type,
+          const std::shared_ptr<DataType>& item_type, bool keys_sorted = false);
+
+  std::shared_ptr<DataType> key_type() const { return value_type()->child(0)->type(); }
+
+  std::shared_ptr<DataType> item_type() const { return value_type()->child(1)->type(); }
+
+  std::string ToString() const override;
+
+  std::string name() const override { return "map"; }
+
+  bool keys_sorted() const { return keys_sorted_; }
+
+ private:
+  bool keys_sorted_;
+};
+
 /// \brief Concrete type class for fixed size list data
 class ARROW_EXPORT FixedSizeListType : public NestedType {
  public:
   static constexpr Type::type type_id = Type::FIXED_SIZE_LIST;
 
   // List can contain any other logical value type
-  explicit FixedSizeListType(const std::shared_ptr<DataType>& value_type,
-                             int32_t list_size)
+  FixedSizeListType(const std::shared_ptr<DataType>& value_type, int32_t list_size)
       : FixedSizeListType(std::make_shared<Field>("item", value_type), list_size) {}
 
-  explicit FixedSizeListType(const std::shared_ptr<Field>& value_field, int32_t list_size)
-      : NestedType(Type::FIXED_SIZE_LIST), list_size_(list_size) {
+  FixedSizeListType(const std::shared_ptr<Field>& value_field, int32_t list_size)
+      : NestedType(type_id), list_size_(list_size) {
     children_ = {value_field};
   }
 
@@ -982,6 +1007,12 @@ std::shared_ptr<DataType> list(const std::shared_ptr<Field>& value_type);
 ARROW_EXPORT
 std::shared_ptr<DataType> list(const std::shared_ptr<DataType>& value_type);
 
+/// \brief Create a MapType instance from its key and value DataTypes
+ARROW_EXPORT
+std::shared_ptr<DataType> map(const std::shared_ptr<DataType>& key_type,
+                              const std::shared_ptr<DataType>& value_type,
+                              bool keys_sorted = false);
+
 /// \brief Create a FixedSizeListType instance from its child Field type
 ARROW_EXPORT
 std::shared_ptr<DataType> fixed_size_list(const std::shared_ptr<Field>& value_type,
diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h
index 040ccf2..918c25e 100644
--- a/cpp/src/arrow/type_fwd.h
+++ b/cpp/src/arrow/type_fwd.h
@@ -78,6 +78,11 @@ class ListArray;
 class ListBuilder;
 struct ListScalar;
 
+class MapType;
+class MapArray;
+class MapBuilder;
+struct MapScalar;
+
 class FixedSizeListType;
 class FixedSizeListArray;
 class FixedSizeListBuilder;
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 49c8ff8..4902f5c 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -279,6 +279,14 @@ struct TypeTraits<ListType> {
 };
 
 template <>
+struct TypeTraits<MapType> {
+  using ArrayType = MapArray;
+  using BuilderType = MapBuilder;
+  using ScalarType = MapScalar;
+  constexpr static bool is_parameter_free = false;
+};
+
+template <>
 struct TypeTraits<FixedSizeListType> {
   using ArrayType = FixedSizeListArray;
   using BuilderType = FixedSizeListBuilder;
diff --git a/cpp/src/arrow/util/checked_cast.h b/cpp/src/arrow/util/checked_cast.h
index 718f105..d75a6a3 100644
--- a/cpp/src/arrow/util/checked_cast.h
+++ b/cpp/src/arrow/util/checked_cast.h
@@ -48,6 +48,15 @@ std::shared_ptr<T> checked_pointer_cast(const std::shared_ptr<U>& r) noexcept {
 #endif
 }
 
+template <class T, class U>
+std::unique_ptr<T> checked_pointer_cast(std::unique_ptr<U> r) noexcept {
+#ifndef NDEBUG
+  return std::unique_ptr<T>(static_cast<T*>(r.release()));
+#else
+  return std::unique_ptr<T>(dynamic_cast<T*>(r.release()));
+#endif
+}
+
 }  // namespace internal
 }  // namespace arrow
 
diff --git a/cpp/src/arrow/visitor.cc b/cpp/src/arrow/visitor.cc
index 9f28b15..53b341b 100644
--- a/cpp/src/arrow/visitor.cc
+++ b/cpp/src/arrow/visitor.cc
@@ -57,6 +57,7 @@ ARRAY_VISITOR_DEFAULT(DayTimeIntervalArray)
 ARRAY_VISITOR_DEFAULT(MonthIntervalArray)
 ARRAY_VISITOR_DEFAULT(DurationArray)
 ARRAY_VISITOR_DEFAULT(ListArray)
+ARRAY_VISITOR_DEFAULT(MapArray)
 ARRAY_VISITOR_DEFAULT(FixedSizeListArray)
 ARRAY_VISITOR_DEFAULT(StructArray)
 ARRAY_VISITOR_DEFAULT(UnionArray)
@@ -100,6 +101,7 @@ TYPE_VISITOR_DEFAULT(MonthIntervalType)
 TYPE_VISITOR_DEFAULT(DurationType)
 TYPE_VISITOR_DEFAULT(Decimal128Type)
 TYPE_VISITOR_DEFAULT(ListType)
+TYPE_VISITOR_DEFAULT(MapType)
 TYPE_VISITOR_DEFAULT(FixedSizeListType)
 TYPE_VISITOR_DEFAULT(StructType)
 TYPE_VISITOR_DEFAULT(UnionType)
@@ -143,6 +145,7 @@ SCALAR_VISITOR_DEFAULT(MonthIntervalScalar)
 SCALAR_VISITOR_DEFAULT(DurationScalar)
 SCALAR_VISITOR_DEFAULT(Decimal128Scalar)
 SCALAR_VISITOR_DEFAULT(ListScalar)
+SCALAR_VISITOR_DEFAULT(MapScalar)
 SCALAR_VISITOR_DEFAULT(FixedSizeListScalar)
 SCALAR_VISITOR_DEFAULT(StructScalar)
 SCALAR_VISITOR_DEFAULT(DictionaryScalar)
diff --git a/cpp/src/arrow/visitor.h b/cpp/src/arrow/visitor.h
index 1b40ce4..a4979e9 100644
--- a/cpp/src/arrow/visitor.h
+++ b/cpp/src/arrow/visitor.h
@@ -54,6 +54,7 @@ class ARROW_EXPORT ArrayVisitor {
   virtual Status Visit(const DurationArray& array);
   virtual Status Visit(const Decimal128Array& array);
   virtual Status Visit(const ListArray& array);
+  virtual Status Visit(const MapArray& array);
   virtual Status Visit(const FixedSizeListArray& array);
   virtual Status Visit(const StructArray& array);
   virtual Status Visit(const UnionArray& array);
@@ -91,6 +92,7 @@ class ARROW_EXPORT TypeVisitor {
   virtual Status Visit(const DurationType& type);
   virtual Status Visit(const Decimal128Type& type);
   virtual Status Visit(const ListType& type);
+  virtual Status Visit(const MapType& type);
   virtual Status Visit(const FixedSizeListType& type);
   virtual Status Visit(const StructType& type);
   virtual Status Visit(const UnionType& type);
@@ -128,6 +130,7 @@ class ARROW_EXPORT ScalarVisitor {
   virtual Status Visit(const DurationScalar& scalar);
   virtual Status Visit(const Decimal128Scalar& scalar);
   virtual Status Visit(const ListScalar& scalar);
+  virtual Status Visit(const MapScalar& scalar);
   virtual Status Visit(const FixedSizeListScalar& scalar);
   virtual Status Visit(const StructScalar& scalar);
   virtual Status Visit(const DictionaryScalar& scalar);
diff --git a/cpp/src/arrow/visitor_inline.h b/cpp/src/arrow/visitor_inline.h
index 01bf442..b9ade98 100644
--- a/cpp/src/arrow/visitor_inline.h
+++ b/cpp/src/arrow/visitor_inline.h
@@ -56,6 +56,7 @@ namespace arrow {
   ACTION(Time64);                            \
   ACTION(Decimal128);                        \
   ACTION(List);                              \
+  ACTION(Map);                               \
   ACTION(FixedSizeList);                     \
   ACTION(Struct);                            \
   ACTION(Union);                             \
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index d6a9b44..96db68b 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -114,6 +114,7 @@ class LevelBuilder {
                                   " not supported yet");                   \
   }
 
+  NOT_IMPLEMENTED_VISIT(Map)
   NOT_IMPLEMENTED_VISIT(FixedSizeList)
   NOT_IMPLEMENTED_VISIT(Struct)
   NOT_IMPLEMENTED_VISIT(Union)
diff --git a/dev/archery/archery/lang/cpp.py b/dev/archery/archery/lang/cpp.py
index 84b6346..623438e 100644
--- a/dev/archery/archery/lang/cpp.py
+++ b/dev/archery/archery/lang/cpp.py
@@ -60,6 +60,7 @@ class CppConfiguration:
         if self.cxx_flags:
             yield ("ARROW_CXXFLAGS", self.cxx_flags)
 
+        yield ("CMAKE_EXPORT_COMPILE_COMMANDS", truthifier(True))
         yield ("CMAKE_BUILD_TYPE", or_else(self.build_type, "debug"))
         yield ("BUILD_WARNING_LEVEL", or_else(self.warn_level, "production"))
 
diff --git a/docs/source/format/Layout.rst b/docs/source/format/Layout.rst
index 9bc3a5b..c4efe48 100644
--- a/docs/source/format/Layout.rst
+++ b/docs/source/format/Layout.rst
@@ -393,8 +393,8 @@ will have the following representation: ::
       |--------------------------|-----------------------|
       | 00001101                 | 0 (padding)           |
 
-    * Values array (char array):
-      * Length: 7,  Null count: 0
+    * Values array (byte array):
+      * Length: 16,  Null count: 0
       * Null bitmap buffer: Not required
 
         | Bytes 0-3       | Bytes 4-7   | Bytes 8-15                      |
@@ -491,6 +491,91 @@ for the null struct but are 'hidden' from the consumer by the parent array's
 null bitmap.  However, when treated independently corresponding
 values of the children array will be non-null.
 
+
+Map type
+--------
+
+Map is a nested type in which each array slot contains a variable size sequence
+of key-item pairs.
+
+A map type is specified like ``Map<K, I>``, where ``K`` and ``I`` are
+any relative type (primitive or nested) and represent the key and item types
+respectively.
+
+A map array is represented by the combination of the following:
+
+* A child array (of type ``Struct<K, I>``) containing key item pairs. This has
+  child arrays:
+  * A keys array of type ``K``. This array may not contain nulls.
+  * An items array of type ``I``.
+* An offsets buffer containing 32-bit signed integers with length equal to the
+  length of the top-level array plus one. Note that this limits the size of the
+  child arrays to 2 :sup:`31` -1.
+
+The offsets array encodes a start position in the child arrays, and the length
+of the map in each slot is computed using the first difference with the next
+element in the offsets array. (Equivalent offsets layout to ``List<T>``).
+Each slice of the child arrays delimited by the offsets array represent a set
+of key item pairs in the corresponding slot of the parent map array.
+
+Example Layout: ``Map<K, I>`` Array
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Let's consider an example, the type ``Map<String, Int32>``.
+
+For an array of length 4 with respective values: ::
+
+    [{'joe': 0}, null, {'mark': null, 'cap': 8}, {}]
+
+will have the following representation: ::
+
+    * Length: 4, Null count: 1
+    * Null bitmap buffer:
+
+      | Byte 0 (validity bitmap) | Bytes 1-63            |
+      |--------------------------|-----------------------|
+      | 00001101                 | 0 (padding)           |
+
+    * Offsets buffer (int32):
+
+      | Bytes 0-19     |
+      |----------------|
+      | 0, 1, 1, 3, 3  |
+
+    * 'pairs' array (`Struct<String, Int32>`):
+      * Length: 3, Null count: 0
+      * Null bitmap buffer: Not required
+
+      * 'keys' array (`String`):
+        * Length: 3, Null count: 0
+        * Null bitmap buffer: Not required
+        * Offsets buffer (int32):
+
+          | Bytes 0-15   |
+          |--------------|
+          | 0, 3, 7, 10  |
+
+         * Values buffer:
+
+          | Bytes 0-10     |
+          |----------------|
+          | joemarkcap     |
+
+      * 'items' array (`Int32`):
+        * Length: 3, Null count: 1
+        * Null bitmap buffer:
+
+          | Byte 0 (validity bitmap) | Bytes 1-63            |
+          |--------------------------|-----------------------|
+          | 00000101                 | 0 (padding)           |
+
+        * Value Buffer (int32):
+
+          | Bytes 0-3   | Bytes 4-7   | Bytes 8-11  |
+          |-------------|-------------|-------------|
+          |  0          | unspecified |  8          |
+
+
 Dense union type
 ----------------
 
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 54e9487..cb0501d 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -705,6 +705,66 @@ class ListColumn(Column):
         return [self.values.get_json()]
 
 
+class MapType(DataType):
+
+    def __init__(self, name, key_type, item_type, nullable=True,
+                 keysSorted=False):
+        super(MapType, self).__init__(name, nullable=nullable)
+
+        assert not key_type.nullable
+        self.key_type = key_type
+        self.item_type = item_type
+        self.pair_type = StructType('item', [key_type, item_type], False)
+        self.keysSorted = keysSorted
+
+    def _get_type(self):
+        return OrderedDict([
+            ('name', 'map'),
+            ('keysSorted', self.keysSorted)
+        ])
+
+    def _get_children(self):
+        return [self.pair_type.get_json()]
+
+    def generate_column(self, size, name=None):
+        MAX_MAP_SIZE = 4
+
+        is_valid = self._make_is_valid(size)
+        map_sizes = np.random.randint(0, MAX_MAP_SIZE + 1, size=size)
+        offsets = [0]
+
+        offset = 0
+        for i in range(size):
+            if is_valid[i]:
+                offset += int(map_sizes[i])
+            offsets.append(offset)
+
+        # The offset now is the total number of elements in the child array
+        pairs = self.pair_type.generate_column(offset)
+        if name is None:
+            name = self.name
+
+        return MapColumn(name, size, is_valid, offsets, pairs)
+
+
+class MapColumn(Column):
+
+    def __init__(self, name, count, is_valid, offsets, pairs):
+        super(MapColumn, self).__init__(name, count)
+        self.is_valid = is_valid
+        self.offsets = offsets
+        self.pairs = pairs
+
+    def _get_buffers(self):
+        return [
+            ('VALIDITY', [int(v) for v in self.is_valid]),
+            ('OFFSET', list(self.offsets))
+        ]
+
+    def _get_children(self):
+        return [self.pairs.get_json()]
+
+
 class StructType(DataType):
 
     def __init__(self, name, field_types, nullable=True):
@@ -957,6 +1017,18 @@ def generate_interval_case():
     return _generate_file("interval", fields, batch_sizes)
 
 
+def generate_map_case():
+    # TODO(bkietz): separated from nested_case so it can be
+    # independently skipped, consolidate after Java supports map
+    fields = [
+        MapType('map_nullable', get_field('key', 'utf8', False),
+                get_field('item', 'int32')),
+    ]
+
+    batch_sizes = [7, 10]
+    return _generate_file("map", fields, batch_sizes)
+
+
 def generate_nested_case():
     fields = [
         ListType('list_nullable', get_field('item', 'int32')),
@@ -1035,6 +1107,7 @@ def get_generated_json_files(tempdir=None, flight=False):
         generate_decimal_case(),
         generate_datetime_case(),
         generate_interval_case(),
+        generate_map_case(),
         generate_nested_case(),
         generate_dictionary_case().skip_category(SKIP_FLIGHT),
         generate_nested_dictionary_case().skip_category(SKIP_ARROW)
@@ -1104,10 +1177,18 @@ class IntegrationRunner(object):
 
             file_id = guid()[:8]
 
+            if (('JS' in (producer.name, consumer.name) or
+                    'Java' in (producer.name, consumer.name)) and
+                    "map" in test_case.name):
+                print('TODO(ARROW-1279): Enable map tests ' +
+                      ' for Java and JS once Java supports them and JS\'' +
+                      ' are unbroken')
+                continue
+
             if ('JS' in (producer.name, consumer.name) and
                     "interval" in test_case.name):
                 print('TODO(ARROW-5239): Enable interval tests ' +
-                      ' for JS once, JS supports them')
+                      ' for JS once JS supports them')
                 continue
 
             # Make the random access file
@@ -1166,6 +1247,13 @@ class IntegrationRunner(object):
             print('Testing file {0}'.format(json_path))
             print('=' * 58)
 
+            if ('Java' in (producer.name, consumer.name) and
+               "map" in test_case.name):
+                print('TODO(ARROW-1279): Enable map tests ' +
+                      ' for Java and JS once Java supports them and JS\'' +
+                      ' are unbroken')
+                continue
+
             if SKIP_FLIGHT in test_case.skip:
                 print('-- Skipping test')
                 continue
@@ -1503,6 +1591,7 @@ def run_all_tests(args):
 
 
 def write_js_test_json(directory):
+    generate_map_case().write(os.path.join(directory, 'map.json'))
     generate_nested_case().write(os.path.join(directory, 'nested.json'))
     generate_decimal_case().write(os.path.join(directory, 'decimal.json'))
     generate_datetime_case().write(os.path.join(directory, 'datetime.json'))