You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/08/12 12:06:15 UTC

[arrow] branch master updated: ARROW-5977: [C++] [Python] Allow specifying which columns to include

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 93688e8  ARROW-5977: [C++] [Python] Allow specifying which columns to include
93688e8 is described below

commit 93688e8c1fa2f22d46394c548a9edbd3d2d7c62d
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Aug 12 14:05:54 2019 +0200

    ARROW-5977: [C++] [Python] Allow specifying which columns to include
    
    If the `include_columns` option is not empty:
    - columns are included in that order
    - columns not in `include_columns` are ignored
    - columns in `include_columns` but not in the CSV file produce a null column
    
    Closes #5026 from pitrou/ARROW-5977-csv-include-columns and squashes the following commits:
    
    0f198ef0b <Antoine Pitrou> ARROW-5977:   Allow specifying which columns to include
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/csv/column-builder-test.cc | 135 ++++++++++++++++++++++++++-----
 cpp/src/arrow/csv/column-builder.cc      |  92 +++++++++++++++++++--
 cpp/src/arrow/csv/column-builder.h       |  12 ++-
 cpp/src/arrow/csv/options.h              |  13 +++
 cpp/src/arrow/csv/parser.cc              |   6 +-
 cpp/src/arrow/csv/reader.cc              | 118 ++++++++++++++++++++-------
 cpp/src/arrow/csv/reader.h               |   1 -
 cpp/src/arrow/table.cc                   |  15 +++-
 python/pyarrow/_csv.pyx                  |  54 ++++++++++++-
 python/pyarrow/error.pxi                 |   4 +-
 python/pyarrow/includes/libarrow.pxd     |   2 +
 python/pyarrow/table.pxi                 |   7 ++
 python/pyarrow/tests/test_csv.py         | 104 +++++++++++++++++++++++-
 python/pyarrow/tests/test_table.py       |   4 +
 14 files changed, 498 insertions(+), 69 deletions(-)

diff --git a/cpp/src/arrow/csv/column-builder-test.cc b/cpp/src/arrow/csv/column-builder-test.cc
index f2c39aa..62e8d06 100644
--- a/cpp/src/arrow/csv/column-builder-test.cc
+++ b/cpp/src/arrow/csv/column-builder-test.cc
@@ -24,6 +24,7 @@
 #include "arrow/csv/column-builder.h"
 #include "arrow/csv/options.h"
 #include "arrow/csv/test-common.h"
+#include "arrow/memory_pool.h"
 #include "arrow/table.h"
 #include "arrow/testing/util.h"
 #include "arrow/type.h"
@@ -48,6 +49,74 @@ void AssertBuilding(const std::shared_ptr<ColumnBuilder>& builder,
   }
   ASSERT_OK(builder->task_group()->Finish());
   ASSERT_OK(builder->Finish(out));
+  ASSERT_OK((*out)->Validate());
+}
+
+//////////////////////////////////////////////////////////////////////////
+// Tests for null column builder
+
+TEST(NullColumnBuilder, Empty) {
+  std::shared_ptr<DataType> type = null();
+  auto tg = TaskGroup::MakeSerial();
+
+  std::shared_ptr<ColumnBuilder> builder;
+  ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder));
+
+  std::shared_ptr<ChunkedArray> actual;
+  AssertBuilding(builder, {}, &actual);
+
+  ChunkedArray expected({}, type);
+  AssertChunkedEqual(*actual, expected);
+}
+
+TEST(NullColumnBuilder, InsertNull) {
+  // Bulding a column of nulls with type null()
+  std::shared_ptr<DataType> type = null();
+  auto tg = TaskGroup::MakeSerial();
+
+  std::shared_ptr<ColumnBuilder> builder;
+  ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder));
+
+  std::shared_ptr<BlockParser> parser;
+  std::shared_ptr<ChunkedArray> actual, expected;
+  // Those values are indifferent, only the number of rows is used
+  MakeColumnParser({"456", "789"}, &parser);
+  builder->Insert(1, parser);
+  MakeColumnParser({"123"}, &parser);
+  builder->Insert(0, parser);
+  ASSERT_OK(builder->task_group()->Finish());
+  ASSERT_OK(builder->Finish(&actual));
+  ASSERT_OK(actual->Validate());
+
+  auto chunks =
+      ArrayVector{std::make_shared<NullArray>(1), std::make_shared<NullArray>(2)};
+  expected = std::make_shared<ChunkedArray>(chunks);
+  AssertChunkedEqual(*actual, *expected);
+}
+
+TEST(NullColumnBuilder, InsertTyped) {
+  // Bulding a column of nulls with another type
+  std::shared_ptr<DataType> type = int16();
+  auto tg = TaskGroup::MakeSerial();
+
+  std::shared_ptr<ColumnBuilder> builder;
+  ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder));
+
+  std::shared_ptr<BlockParser> parser;
+  std::shared_ptr<ChunkedArray> actual, expected;
+  // Those values are indifferent, only the number of rows is used
+  MakeColumnParser({"abc", "def", "ghi"}, &parser);
+  builder->Insert(1, parser);
+  MakeColumnParser({"jkl"}, &parser);
+  builder->Insert(0, parser);
+  ASSERT_OK(builder->task_group()->Finish());
+  ASSERT_OK(builder->Finish(&actual));
+  ASSERT_OK(actual->Validate());
+
+  auto chunks = ArrayVector{ArrayFromJSON(type, "[null]"),
+                            ArrayFromJSON(type, "[null, null, null]")};
+  expected = std::make_shared<ChunkedArray>(chunks);
+  AssertChunkedEqual(*actual, *expected);
 }
 
 //////////////////////////////////////////////////////////////////////////
@@ -56,7 +125,8 @@ void AssertBuilding(const std::shared_ptr<ColumnBuilder>& builder,
 TEST(ColumnBuilder, Empty) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+                                ConvertOptions::Defaults(), tg, &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {}, &actual);
@@ -68,7 +138,8 @@ TEST(ColumnBuilder, Empty) {
 TEST(ColumnBuilder, Basics) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+                                ConvertOptions::Defaults(), tg, &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"123", "-456"}}, &actual);
@@ -82,7 +153,8 @@ TEST(ColumnBuilder, Insert) {
   // Test ColumnBuilder::Insert()
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+                                ConvertOptions::Defaults(), tg, &builder));
 
   std::shared_ptr<BlockParser> parser;
   std::shared_ptr<ChunkedArray> actual, expected;
@@ -92,6 +164,7 @@ TEST(ColumnBuilder, Insert) {
   builder->Insert(0, parser);
   ASSERT_OK(builder->task_group()->Finish());
   ASSERT_OK(builder->Finish(&actual));
+  ASSERT_OK(actual->Validate());
 
   ChunkedArrayFromVector<Int32Type>({{123}, {456}}, &expected);
   AssertChunkedEqual(*actual, *expected);
@@ -100,7 +173,8 @@ TEST(ColumnBuilder, Insert) {
 TEST(ColumnBuilder, MultipleChunks) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+                                ConvertOptions::Defaults(), tg, &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"1", "2", "3"}, {"4", "5"}}, &actual);
@@ -113,7 +187,8 @@ TEST(ColumnBuilder, MultipleChunks) {
 TEST(ColumnBuilder, MultipleChunksParallel) {
   auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool());
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+                                ConvertOptions::Defaults(), tg, &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual);
@@ -129,7 +204,8 @@ TEST(ColumnBuilder, MultipleChunksParallel) {
 TEST(InferringColumnBuilder, Empty) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {}, &actual);
@@ -141,7 +217,8 @@ TEST(InferringColumnBuilder, Empty) {
 TEST(InferringColumnBuilder, SingleChunkNull) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"", "NA"}}, &actual);
@@ -153,7 +230,8 @@ TEST(InferringColumnBuilder, SingleChunkNull) {
 TEST(InferringColumnBuilder, MultipleChunkNull) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"", "NA"}, {""}, {"NaN"}}, &actual);
@@ -165,7 +243,8 @@ TEST(InferringColumnBuilder, MultipleChunkNull) {
 TEST(InferringColumnBuilder, SingleChunkInteger) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"", "123", "456"}}, &actual);
@@ -178,7 +257,8 @@ TEST(InferringColumnBuilder, SingleChunkInteger) {
 TEST(InferringColumnBuilder, MultipleChunkInteger) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{""}, {"NA", "123", "456"}}, &actual);
@@ -192,7 +272,8 @@ TEST(InferringColumnBuilder, MultipleChunkInteger) {
 TEST(InferringColumnBuilder, SingleChunkBoolean) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"", "0", "FALSE"}}, &actual);
@@ -206,7 +287,8 @@ TEST(InferringColumnBuilder, SingleChunkBoolean) {
 TEST(InferringColumnBuilder, MultipleChunkBoolean) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{""}, {"1", "True", "0"}}, &actual);
@@ -220,7 +302,8 @@ TEST(InferringColumnBuilder, MultipleChunkBoolean) {
 TEST(InferringColumnBuilder, SingleChunkReal) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"", "0.0", "12.5"}}, &actual);
@@ -234,7 +317,8 @@ TEST(InferringColumnBuilder, SingleChunkReal) {
 TEST(InferringColumnBuilder, MultipleChunkReal) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{""}, {"008"}, {"NaN", "12.5"}}, &actual);
@@ -248,7 +332,8 @@ TEST(InferringColumnBuilder, MultipleChunkReal) {
 TEST(InferringColumnBuilder, SingleChunkTimestamp) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"", "1970-01-01", "2018-11-13 17:11:10"}}, &actual);
@@ -263,7 +348,8 @@ TEST(InferringColumnBuilder, SingleChunkTimestamp) {
 TEST(InferringColumnBuilder, MultipleChunkTimestamp) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{""}, {"1970-01-01"}, {"2018-11-13 17:11:10"}}, &actual);
@@ -282,7 +368,8 @@ TEST(InferringColumnBuilder, SingleChunkString) {
   std::shared_ptr<ChunkedArray> expected;
 
   // With valid UTF8
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
   AssertBuilding(builder, {{"", "foo", "baré"}}, &actual);
 
   ChunkedArrayFromVector<StringType, std::string>({{true, true, true}},
@@ -293,7 +380,7 @@ TEST(InferringColumnBuilder, SingleChunkString) {
   auto options = ConvertOptions::Defaults();
   options.check_utf8 = false;
   tg = TaskGroup::MakeSerial();
-  ASSERT_OK(ColumnBuilder::Make(0, options, tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));
   AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual);
 
   ChunkedArrayFromVector<StringType, std::string>({{true, true, true}},
@@ -308,7 +395,8 @@ TEST(InferringColumnBuilder, SingleChunkBinary) {
   std::shared_ptr<ChunkedArray> expected;
 
   // With invalid UTF8, checking
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
   AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual);
 
   ChunkedArrayFromVector<BinaryType, std::string>({{true, true, true}},
@@ -319,7 +407,8 @@ TEST(InferringColumnBuilder, SingleChunkBinary) {
 TEST(InferringColumnBuilder, MultipleChunkString) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré"}}, &actual);
@@ -333,7 +422,8 @@ TEST(InferringColumnBuilder, MultipleChunkString) {
 TEST(InferringColumnBuilder, MultipleChunkBinary) {
   auto tg = TaskGroup::MakeSerial();
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré\xff"}}, &actual);
@@ -350,7 +440,8 @@ TEST(InferringColumnBuilder, MultipleChunkBinary) {
 TEST(InferringColumnBuilder, MultipleChunkIntegerParallel) {
   auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool());
   std::shared_ptr<ColumnBuilder> builder;
-  ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+  ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+                                &builder));
 
   std::shared_ptr<ChunkedArray> actual;
   AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual);
diff --git a/cpp/src/arrow/csv/column-builder.cc b/cpp/src/arrow/csv/column-builder.cc
index cfc36fe..eff0088 100644
--- a/cpp/src/arrow/csv/column-builder.cc
+++ b/cpp/src/arrow/csv/column-builder.cc
@@ -25,9 +25,11 @@
 #include <vector>
 
 #include "arrow/array.h"
+#include "arrow/builder.h"
 #include "arrow/csv/column-builder.h"
 #include "arrow/csv/converter.h"
 #include "arrow/csv/options.h"
+#include "arrow/csv/parser.h"
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
@@ -51,6 +53,73 @@ void ColumnBuilder::Append(const std::shared_ptr<BlockParser>& parser) {
 }
 
 //////////////////////////////////////////////////////////////////////////
+// Null column builder implementation (for a column not in the CSV file)
+
+class NullColumnBuilder : public ColumnBuilder {
+ public:
+  explicit NullColumnBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool,
+                             const std::shared_ptr<internal::TaskGroup>& task_group)
+      : ColumnBuilder(task_group), type_(type), pool_(pool) {}
+
+  Status Init();
+
+  void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override;
+  Status Finish(std::shared_ptr<ChunkedArray>* out) override;
+
+  // While NullColumnBuilder is so cheap that it doesn't need parallelization
+  // in itself, the CSV reader doesn't know this and can still call it from
+  // multiple threads, so use a mutex anyway.
+  std::mutex mutex_;
+
+  std::shared_ptr<DataType> type_;
+  MemoryPool* pool_;
+  std::unique_ptr<ArrayBuilder> builder_;
+};
+
+Status NullColumnBuilder::Init() { return MakeBuilder(pool_, type_, &builder_); }
+
+void NullColumnBuilder::Insert(int64_t block_index,
+                               const std::shared_ptr<BlockParser>& parser) {
+  // Create a null Array pointer at the back at the list.
+  size_t chunk_index = static_cast<size_t>(block_index);
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+    if (chunks_.size() <= chunk_index) {
+      chunks_.resize(chunk_index + 1);
+    }
+  }
+
+  // Spawn a task that will build an array of nulls with the right DataType
+  const int32_t num_rows = parser->num_rows();
+  DCHECK_GE(num_rows, 0);
+
+  task_group_->Append([=]() -> Status {
+    std::shared_ptr<Array> res;
+    RETURN_NOT_OK(builder_->AppendNulls(num_rows));
+    RETURN_NOT_OK(builder_->Finish(&res));
+
+    std::lock_guard<std::mutex> lock(mutex_);
+    // Should not insert an already built chunk
+    DCHECK_EQ(chunks_[chunk_index], nullptr);
+    chunks_[chunk_index] = std::move(res);
+    return Status::OK();
+  });
+}
+
+Status NullColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) {
+  // Unnecessary iff all tasks have finished
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  for (const auto& chunk : chunks_) {
+    if (chunk == nullptr) {
+      return Status::Invalid("a chunk failed allocating for an unknown reason");
+    }
+  }
+  *out = std::make_shared<ChunkedArray>(chunks_, type_);
+  return Status::OK();
+}
+
+//////////////////////////////////////////////////////////////////////////
 // Pre-typed column builder implementation
 
 class TypedColumnBuilder : public ColumnBuilder {
@@ -355,28 +424,37 @@ Status InferringColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) {
 //////////////////////////////////////////////////////////////////////////
 // Factory functions
 
-Status ColumnBuilder::Make(const std::shared_ptr<DataType>& type, int32_t col_index,
-                           const ConvertOptions& options,
+Status ColumnBuilder::Make(MemoryPool* pool, const std::shared_ptr<DataType>& type,
+                           int32_t col_index, const ConvertOptions& options,
                            const std::shared_ptr<TaskGroup>& task_group,
                            std::shared_ptr<ColumnBuilder>* out) {
-  auto ptr =
-      new TypedColumnBuilder(type, col_index, options, default_memory_pool(), task_group);
+  auto ptr = new TypedColumnBuilder(type, col_index, options, pool, task_group);
   auto res = std::shared_ptr<ColumnBuilder>(ptr);
   RETURN_NOT_OK(ptr->Init());
   *out = res;
   return Status::OK();
 }
 
-Status ColumnBuilder::Make(int32_t col_index, const ConvertOptions& options,
+Status ColumnBuilder::Make(MemoryPool* pool, int32_t col_index,
+                           const ConvertOptions& options,
                            const std::shared_ptr<TaskGroup>& task_group,
                            std::shared_ptr<ColumnBuilder>* out) {
-  auto ptr =
-      new InferringColumnBuilder(col_index, options, default_memory_pool(), task_group);
+  // XXX
+  auto ptr = new InferringColumnBuilder(col_index, options, pool, task_group);
   auto res = std::shared_ptr<ColumnBuilder>(ptr);
   RETURN_NOT_OK(ptr->Init());
   *out = res;
   return Status::OK();
 }
 
+Status ColumnBuilder::MakeNull(MemoryPool* pool, const std::shared_ptr<DataType>& type,
+                               const std::shared_ptr<internal::TaskGroup>& task_group,
+                               std::shared_ptr<ColumnBuilder>* out) {
+  auto res = std::make_shared<NullColumnBuilder>(type, pool, task_group);
+  RETURN_NOT_OK(res->Init());
+  *out = std::move(res);
+  return Status::OK();
+}
+
 }  // namespace csv
 }  // namespace arrow
diff --git a/cpp/src/arrow/csv/column-builder.h b/cpp/src/arrow/csv/column-builder.h
index 054a642..789e7b9 100644
--- a/cpp/src/arrow/csv/column-builder.h
+++ b/cpp/src/arrow/csv/column-builder.h
@@ -63,16 +63,22 @@ class ARROW_EXPORT ColumnBuilder {
   std::shared_ptr<internal::TaskGroup> task_group() { return task_group_; }
 
   /// Construct a strictly-typed ColumnBuilder.
-  static Status Make(const std::shared_ptr<DataType>& type, int32_t col_index,
-                     const ConvertOptions& options,
+  static Status Make(MemoryPool* pool, const std::shared_ptr<DataType>& type,
+                     int32_t col_index, const ConvertOptions& options,
                      const std::shared_ptr<internal::TaskGroup>& task_group,
                      std::shared_ptr<ColumnBuilder>* out);
 
   /// Construct a type-inferring ColumnBuilder.
-  static Status Make(int32_t col_index, const ConvertOptions& options,
+  static Status Make(MemoryPool* pool, int32_t col_index, const ConvertOptions& options,
                      const std::shared_ptr<internal::TaskGroup>& task_group,
                      std::shared_ptr<ColumnBuilder>* out);
 
+  /// Construct a ColumnBuilder for a column of nulls
+  /// (i.e. not present in the CSV file).
+  static Status MakeNull(MemoryPool* pool, const std::shared_ptr<DataType>& type,
+                         const std::shared_ptr<internal::TaskGroup>& task_group,
+                         std::shared_ptr<ColumnBuilder>* out);
+
  protected:
   explicit ColumnBuilder(const std::shared_ptr<internal::TaskGroup>& task_group)
       : task_group_(task_group) {}
diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h
index 21d0ab2..846c2b8 100644
--- a/cpp/src/arrow/csv/options.h
+++ b/cpp/src/arrow/csv/options.h
@@ -73,6 +73,19 @@ struct ARROW_EXPORT ConvertOptions {
   // If false, then all strings are valid string values.
   bool strings_can_be_null = false;
 
+  // XXX Should we have a separate FilterOptions?
+
+  // If non-empty, indicates the names of columns from the CSV file that should
+  // be actually read and converted (in the vector's order).
+  // Columns not in this vector will be ignored.
+  std::vector<std::string> include_columns;
+  // If false, columns in `include_columns` but not in the CSV file will error out.
+  // If true, columns in `include_columns` but not in the CSV file will produce
+  // a column of nulls (whose type is selected using `column_types`,
+  // or null by default)
+  // This option is ignored if `include_columns` is empty.
+  bool include_missing_columns = false;
+
   static ConvertOptions Defaults();
 };
 
diff --git a/cpp/src/arrow/csv/parser.cc b/cpp/src/arrow/csv/parser.cc
index d6454ed..8d085dc 100644
--- a/cpp/src/arrow/csv/parser.cc
+++ b/cpp/src/arrow/csv/parser.cc
@@ -513,7 +513,11 @@ Status BlockParser::ParseFinal(const char* data, uint32_t size, uint32_t* out_si
 
 BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols,
                          int32_t max_num_rows)
-    : pool_(pool), options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {}
+    : pool_(pool),
+      options_(options),
+      num_rows_(-1),
+      num_cols_(num_cols),
+      max_num_rows_(max_num_rows) {}
 
 BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows)
     : BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {}
diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index 1ef2d39..1bccc9b 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -163,7 +163,7 @@ class BaseTableReader : public csv::TableReader {
 
     if (read_options_.column_names.empty()) {
       // Read one row with column names
-      BlockParser parser(pool_, parse_options_, num_cols_, 1);
+      BlockParser parser(pool_, parse_options_, num_csv_cols_, 1);
       uint32_t parsed_size = 0;
       RETURN_NOT_OK(parser.Parse(reinterpret_cast<const char*>(cur_data_),
                                  static_cast<uint32_t>(cur_size_), &parsed_size));
@@ -189,27 +189,88 @@ class BaseTableReader : public csv::TableReader {
       column_names_ = read_options_.column_names;
     }
 
-    num_cols_ = static_cast<int32_t>(column_names_.size());
-    DCHECK_GT(num_cols_, 0);
+    num_csv_cols_ = static_cast<int32_t>(column_names_.size());
+    DCHECK_GT(num_csv_cols_, 0);
 
-    // Construct column builders
-    for (int32_t col_index = 0; col_index < num_cols_; ++col_index) {
+    if (convert_options_.include_columns.empty()) {
+      return MakeColumnBuilders();
+    } else {
+      return MakeColumnBuilders(convert_options_.include_columns);
+    }
+  }
+
+  // Make column builders, assuming inclusion of all columns in CSV file order
+  Status MakeColumnBuilders() {
+    for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) {
       std::shared_ptr<ColumnBuilder> builder;
-      // Does the named column have a fixed type?
-      auto it = convert_options_.column_types.find(column_names_[col_index]);
-      if (it == convert_options_.column_types.end()) {
-        RETURN_NOT_OK(
-            ColumnBuilder::Make(col_index, convert_options_, task_group_, &builder));
+      const auto& col_name = column_names_[col_index];
+
+      RETURN_NOT_OK(MakeCSVColumnBuilder(col_name, col_index, &builder));
+      column_builders_.push_back(builder);
+      builder_names_.push_back(col_name);
+    }
+    return Status::OK();
+  }
+
+  // Make column builders, assuming inclusion of columns in `include_columns` order
+  Status MakeColumnBuilders(const std::vector<std::string>& include_columns) {
+    // Compute indices of columns in the CSV file
+    std::unordered_map<std::string, int32_t> col_indices;
+    col_indices.reserve(column_names_.size());
+    for (int32_t i = 0; i < static_cast<int32_t>(column_names_.size()); ++i) {
+      col_indices.emplace(column_names_[i], i);
+    }
+
+    // For each column name in include_columns, build the corresponding ColumnBuilder
+    for (const auto& col_name : include_columns) {
+      std::shared_ptr<ColumnBuilder> builder;
+      auto it = col_indices.find(col_name);
+      if (it != col_indices.end()) {
+        auto col_index = it->second;
+        RETURN_NOT_OK(MakeCSVColumnBuilder(col_name, col_index, &builder));
       } else {
-        RETURN_NOT_OK(ColumnBuilder::Make(it->second, col_index, convert_options_,
-                                          task_group_, &builder));
+        // Column not in the CSV file
+        if (convert_options_.include_missing_columns) {
+          RETURN_NOT_OK(MakeNullColumnBuilder(col_name, &builder));
+        } else {
+          return Status::KeyError("Column '", col_name,
+                                  "' in include_columns "
+                                  "does not exist in CSV file");
+        }
       }
       column_builders_.push_back(builder);
+      builder_names_.push_back(col_name);
     }
-
     return Status::OK();
   }
 
+  // Make a column builder for the given CSV column name and index
+  Status MakeCSVColumnBuilder(const std::string& col_name, int32_t col_index,
+                              std::shared_ptr<ColumnBuilder>* out) {
+    // Does the named column have a fixed type?
+    auto it = convert_options_.column_types.find(col_name);
+    if (it == convert_options_.column_types.end()) {
+      return ColumnBuilder::Make(pool_, col_index, convert_options_, task_group_, out);
+    } else {
+      return ColumnBuilder::Make(pool_, it->second, col_index, convert_options_,
+                                 task_group_, out);
+    }
+  }
+
+  // Make a column builder for a column of nulls
+  Status MakeNullColumnBuilder(const std::string& col_name,
+                               std::shared_ptr<ColumnBuilder>* out) {
+    std::shared_ptr<DataType> type;
+    // If the named column have a fixed type, use it, otherwise use null()
+    auto it = convert_options_.column_types.find(col_name);
+    if (it != convert_options_.column_types.end()) {
+      type = it->second;
+    } else {
+      type = null();
+    }
+    return ColumnBuilder::MakeNull(pool_, type, task_group_, out);
+  }
+
   // Trigger conversion of parsed block data
   Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) {
     for (auto& builder : column_builders_) {
@@ -219,17 +280,15 @@ class BaseTableReader : public csv::TableReader {
   }
 
   Status MakeTable(std::shared_ptr<Table>* out) {
-    DCHECK_GT(num_cols_, 0);
-    DCHECK_EQ(column_names_.size(), static_cast<uint32_t>(num_cols_));
-    DCHECK_EQ(column_builders_.size(), static_cast<uint32_t>(num_cols_));
+    DCHECK_EQ(column_builders_.size(), builder_names_.size());
 
     std::vector<std::shared_ptr<Field>> fields;
     std::vector<std::shared_ptr<ChunkedArray>> columns;
 
-    for (int32_t i = 0; i < num_cols_; ++i) {
+    for (int32_t i = 0; i < static_cast<int32_t>(builder_names_.size()); ++i) {
       std::shared_ptr<ChunkedArray> array;
       RETURN_NOT_OK(column_builders_[i]->Finish(&array));
-      fields.push_back(::arrow::field(column_names_[i], array->type()));
+      fields.push_back(::arrow::field(builder_names_[i], array->type()));
       columns.emplace_back(std::move(array));
     }
     *out = Table::Make(schema(fields), columns);
@@ -241,12 +300,17 @@ class BaseTableReader : public csv::TableReader {
   ParseOptions parse_options_;
   ConvertOptions convert_options_;
 
-  int32_t num_cols_ = -1;
-  std::shared_ptr<ReadaheadSpooler> readahead_;
-  // Column names
+  // Number of columns in the CSV file
+  int32_t num_csv_cols_ = -1;
+  // Column names in the CSV file
   std::vector<std::string> column_names_;
-  std::shared_ptr<internal::TaskGroup> task_group_;
+  // Column builders for target Table (not necessarily in CSV file order)
   std::vector<std::shared_ptr<ColumnBuilder>> column_builders_;
+  // Names of columns, in same order as column_builders_
+  std::vector<std::string> builder_names_;
+
+  std::shared_ptr<ReadaheadSpooler> readahead_;
+  std::shared_ptr<internal::TaskGroup> task_group_;
 
   // Current block and data pointer
   std::shared_ptr<Buffer> cur_block_;
@@ -289,7 +353,7 @@ class SerialTableReader : public BaseTableReader {
 
     static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
     auto parser =
-        std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
+        std::make_shared<BlockParser>(pool_, parse_options_, num_csv_cols_, max_num_rows);
     while (!eof_) {
       // Consume current block
       uint32_t parsed_size = 0;
@@ -376,8 +440,8 @@ class ThreadedTableReader : public BaseTableReader {
 
         // "mutable" allows to modify captured by-copy chunk_buffer
         task_group_->Append([=]() mutable -> Status {
-          auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_cols_,
-                                                      max_num_rows);
+          auto parser = std::make_shared<BlockParser>(pool_, parse_options_,
+                                                      num_csv_cols_, max_num_rows);
           uint32_t parsed_size = 0;
           RETURN_NOT_OK(parser->Parse(reinterpret_cast<const char*>(chunk_data),
                                       chunk_size, &parsed_size));
@@ -409,8 +473,8 @@ class ThreadedTableReader : public BaseTableReader {
       for (auto& builder : column_builders_) {
         builder->SetTaskGroup(task_group_);
       }
-      auto parser =
-          std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
+      auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_csv_cols_,
+                                                  max_num_rows);
       uint32_t parsed_size = 0;
       RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast<const char*>(cur_data_),
                                        static_cast<uint32_t>(cur_size_), &parsed_size));
diff --git a/cpp/src/arrow/csv/reader.h b/cpp/src/arrow/csv/reader.h
index edf6f11..53255f9 100644
--- a/cpp/src/arrow/csv/reader.h
+++ b/cpp/src/arrow/csv/reader.h
@@ -41,7 +41,6 @@ class ARROW_EXPORT TableReader {
 
   virtual Status Read(std::shared_ptr<Table>* out) = 0;
 
-  // XXX pass optional schema?
   static Status Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
                      const ReadOptions&, const ParseOptions&, const ConvertOptions&,
                      std::shared_ptr<TableReader>* out);
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 446010f..3f68561 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -21,6 +21,7 @@
 #include <cstdlib>
 #include <limits>
 #include <memory>
+#include <sstream>
 #include <utility>
 
 #include "arrow/array.h"
@@ -184,10 +185,12 @@ Status ChunkedArray::Validate() const {
   }
 
   const auto& type = *chunks_[0]->type();
+  // Make sure chunks all have the same type
   for (size_t i = 1; i < chunks_.size(); ++i) {
-    if (!chunks_[i]->type()->Equals(type)) {
+    const Array& chunk = *chunks_[i];
+    if (!chunk.type()->Equals(type)) {
       return Status::Invalid("In chunk ", i, " expected type ", type.ToString(),
-                             " but saw ", chunks_[i]->type()->ToString());
+                             " but saw ", chunk.type()->ToString());
     }
   }
   return Status::OK();
@@ -343,7 +346,7 @@ class SimpleTable : public Table {
       }
     }
 
-    // Make sure columns are all the same length
+    // Make sure columns are all the same length, and validate them
     for (int i = 0; i < num_columns(); ++i) {
       const ChunkedArray* col = columns_[i].get();
       if (col->length() != num_rows_) {
@@ -351,6 +354,12 @@ class SimpleTable : public Table {
                                " expected length ", num_rows_, " but got length ",
                                col->length());
       }
+      Status st = col->Validate();
+      if (!st.ok()) {
+        std::stringstream ss;
+        ss << "Column " << i << ": " << st.message();
+        return st.WithMessage(ss.str());
+      }
     }
     return Status::OK();
   }
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index 93e9cb3..cb5fe18 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -55,7 +55,7 @@ cdef class ReadOptions:
         The number of rows to skip at the start of the CSV data, not
         including the row of column names (if any).
     column_names: list, optional
-        The Table column names.  If empty, column names will be
+        The column names in the CSV file.  If empty, column names will be
         read from the first row after `skip_rows`.
     """
     cdef:
@@ -115,7 +115,7 @@ cdef class ReadOptions:
     @property
     def column_names(self):
         """
-        The Table column names.  If empty, column names will be
+        The column names in the CSV file.  If empty, column names will be
         read from the first row after `skip_rows`.
         """
         return [frombytes(s) for s in self.options.column_names]
@@ -288,6 +288,17 @@ cdef class ConvertOptions:
         If true, then strings in null_values are considered null for
         string columns.
         If false, then all strings are valid string values.
+    include_columns: list, optional
+        The names of columns to include in the Table.
+        If empty, the Table will include all columns from the CSV file.
+        If not empty, only these columns will be included, in this order.
+    include_missing_columns: bool, optional (default False)
+        If false, columns in `include_columns` but not in the CSV file will
+        error out.
+        If true, columns in `include_columns` but not in the CSV file will
+        produce a column of nulls (whose type is selected using
+        `column_types`, or null by default).
+        This option is ignored if `include_columns` is empty.
     """
     cdef:
         CCSVConvertOptions options
@@ -297,7 +308,8 @@ cdef class ConvertOptions:
 
     def __init__(self, check_utf8=None, column_types=None, null_values=None,
                  true_values=None, false_values=None,
-                 strings_can_be_null=None):
+                 strings_can_be_null=None, include_columns=None,
+                 include_missing_columns=None):
         self.options = CCSVConvertOptions.Defaults()
         if check_utf8 is not None:
             self.check_utf8 = check_utf8
@@ -311,6 +323,10 @@ cdef class ConvertOptions:
             self.false_values = false_values
         if strings_can_be_null is not None:
             self.strings_can_be_null = strings_can_be_null
+        if include_columns is not None:
+            self.include_columns = include_columns
+        if include_missing_columns is not None:
+            self.include_missing_columns = include_missing_columns
 
     @property
     def check_utf8(self):
@@ -396,6 +412,38 @@ cdef class ConvertOptions:
     def false_values(self, value):
         self.options.false_values = [tobytes(x) for x in value]
 
+    @property
+    def include_columns(self):
+        """
+        The names of columns to include in the Table.
+
+        If empty, the Table will include all columns from the CSV file.
+        If not empty, only these columns will be included, in this order.
+        """
+        return [frombytes(s) for s in self.options.include_columns]
+
+    @include_columns.setter
+    def include_columns(self, value):
+        self.options.include_columns.clear()
+        for item in value:
+            self.options.include_columns.push_back(tobytes(item))
+
+    @property
+    def include_missing_columns(self):
+        """
+        If false, columns in `include_columns` but not in the CSV file will
+        error out.
+        If true, columns in `include_columns` but not in the CSV file will
+        produce a null column (whose type is selected using `column_types`,
+        or null by default).
+        This option is ignored if `include_columns` is empty.
+        """
+        return self.options.include_missing_columns
+
+    @include_missing_columns.setter
+    def include_missing_columns(self, value):
+        self.options.include_missing_columns = value
+
 
 cdef _get_reader(input_file, shared_ptr[InputStream]* out):
     use_memory_map = False
diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi
index 3cb9142..6c64f9b 100644
--- a/python/pyarrow/error.pxi
+++ b/python/pyarrow/error.pxi
@@ -37,7 +37,9 @@ class ArrowIOError(IOError, ArrowException):
 
 
 class ArrowKeyError(KeyError, ArrowException):
-    pass
+    def __str__(self):
+        # Override KeyError.__str__, as it uses the repr() of the key
+        return ArrowException.__str__(self)
 
 
 class ArrowTypeError(TypeError, ArrowException):
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index ad0fa09..9c2aa6a 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1113,6 +1113,8 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil:
         vector[c_string] true_values
         vector[c_string] false_values
         c_bool strings_can_be_null
+        vector[c_string] include_columns
+        c_bool include_missing_columns
 
         @staticmethod
         CCSVConvertOptions Defaults()
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index f103d26..fad7ab4 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -78,6 +78,13 @@ cdef class ChunkedArray(_PandasConvertible):
     def __str__(self):
         return self.format()
 
+    def validate(self):
+        """
+        Validate chunked array consistency.
+        """
+        with nogil:
+            check_status(self.sp_chunked_array.get().Validate())
+
     @property
     def null_count(self):
         """
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index a42237e..543f659 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -176,15 +176,27 @@ def test_convert_options():
     opts.false_values = ['xxx', 'yyy']
     assert opts.false_values == ['xxx', 'yyy']
 
+    assert opts.include_columns == []
+    opts.include_columns = ['def', 'abc']
+    assert opts.include_columns == ['def', 'abc']
+
+    assert opts.include_missing_columns is False
+    opts.include_missing_columns = True
+    assert opts.include_missing_columns is True
+
     opts = cls(check_utf8=False, column_types={'a': pa.null()},
                null_values=['N', 'nn'], true_values=['T', 'tt'],
-               false_values=['F', 'ff'], strings_can_be_null=True)
+               false_values=['F', 'ff'], strings_can_be_null=True,
+               include_columns=['abc', 'def'],
+               include_missing_columns=True)
     assert opts.check_utf8 is False
     assert opts.column_types == {'a': pa.null()}
     assert opts.null_values == ['N', 'nn']
     assert opts.false_values == ['F', 'ff']
     assert opts.true_values == ['T', 'tt']
     assert opts.strings_can_be_null is True
+    assert opts.include_columns == ['abc', 'def']
+    assert opts.include_missing_columns is True
 
 
 class BaseTestCSVRead:
@@ -306,6 +318,80 @@ class BaseTestCSVRead:
             "y": ["kl", "op"],
             }
 
+    def test_include_columns(self):
+        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
+
+        convert_options = ConvertOptions()
+        convert_options.include_columns = ['ab']
+        table = self.read_bytes(rows, convert_options=convert_options)
+        self.check_names(table, ["ab"])
+        assert table.to_pydict() == {
+            "ab": ["ef", "ij", "mn"],
+            }
+
+        # Order of include_columns is respected, regardless of CSV order
+        convert_options.include_columns = ['cd', 'ab']
+        table = self.read_bytes(rows, convert_options=convert_options)
+        schema = pa.schema([('cd', pa.string()),
+                            ('ab', pa.string())])
+        assert table.schema == schema
+        assert table.to_pydict() == {
+            "cd": ["gh", "kl", "op"],
+            "ab": ["ef", "ij", "mn"],
+            }
+
+        # Include a column not in the CSV file => raises by default
+        convert_options.include_columns = ['xx', 'ab', 'yy']
+        with pytest.raises(KeyError,
+                           match="Column 'xx' in include_columns "
+                                 "does not exist in CSV file"):
+            self.read_bytes(rows, convert_options=convert_options)
+
+    def test_include_missing_columns(self):
+        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
+
+        read_options = ReadOptions()
+        convert_options = ConvertOptions()
+        convert_options.include_columns = ['xx', 'ab', 'yy']
+        convert_options.include_missing_columns = True
+        table = self.read_bytes(rows, read_options=read_options,
+                                convert_options=convert_options)
+        schema = pa.schema([('xx', pa.null()),
+                            ('ab', pa.string()),
+                            ('yy', pa.null())])
+        assert table.schema == schema
+        assert table.to_pydict() == {
+            "xx": [None, None, None],
+            "ab": ["ef", "ij", "mn"],
+            "yy": [None, None, None],
+            }
+
+        # Combining with `column_names`
+        read_options.column_names = ["xx", "yy"]
+        convert_options.include_columns = ["yy", "cd"]
+        table = self.read_bytes(rows, read_options=read_options,
+                                convert_options=convert_options)
+        schema = pa.schema([('yy', pa.string()),
+                            ('cd', pa.null())])
+        assert table.schema == schema
+        assert table.to_pydict() == {
+            "yy": ["cd", "gh", "kl", "op"],
+            "cd": [None, None, None, None],
+            }
+
+        # And with `column_types` as well
+        convert_options.column_types = {"yy": pa.binary(),
+                                        "cd": pa.int32()}
+        table = self.read_bytes(rows, read_options=read_options,
+                                convert_options=convert_options)
+        schema = pa.schema([('yy', pa.binary()),
+                            ('cd', pa.int32())])
+        assert table.schema == schema
+        assert table.to_pydict() == {
+            "yy": [b"cd", b"gh", b"kl", b"op"],
+            "cd": [None, None, None, None],
+            }
+
     def test_simple_ints(self):
         # Infer integer columns
         rows = b"a,b,c\n1,2,3\n4,5,6\n"
@@ -471,6 +557,22 @@ class BaseTestCSVRead:
         assert "In CSV column #1: " in err
         assert "CSV conversion error to float: invalid value 'XXX'" in err
 
+    def test_column_types_with_column_names(self):
+        # When both `column_names` and `column_types` are given, names
+        # in `column_types` should refer to names in `column_names`
+        rows = b"a,b\nc,d\ne,f\n"
+        read_options = ReadOptions(column_names=['x', 'y'])
+        convert_options = ConvertOptions(column_types={'x': pa.binary()})
+        table = self.read_bytes(rows, read_options=read_options,
+                                convert_options=convert_options)
+        schema = pa.schema([('x', pa.binary()),
+                            ('y', pa.string())])
+        assert table.schema == schema
+        assert table.to_pydict() == {
+            'x': [b'a', b'c', b'e'],
+            'y': ['b', 'd', 'f'],
+            }
+
     def test_no_ending_newline(self):
         # No \n after last line
         rows = b"a,b,c\n1,2,3\n4,5,6"
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 8014c19..64a9a08 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -30,6 +30,7 @@ def test_chunked_array_basics():
     data = pa.chunked_array([], type=pa.string())
     assert data.type == pa.string()
     assert data.to_pylist() == []
+    data.validate()
 
     with pytest.raises(ValueError):
         pa.chunked_array([])
@@ -43,6 +44,7 @@ def test_chunked_array_basics():
     assert all(isinstance(c, pa.lib.Int64Array) for c in data.chunks)
     assert all(isinstance(c, pa.lib.Int64Array) for c in data.iterchunks())
     assert len(data.chunks) == 3
+    data.validate()
 
 
 def test_chunked_array_mismatch_types():
@@ -177,7 +179,9 @@ def test_chunked_array_pickle(data, typ):
         arrays.append(pa.array(data[:2], type=typ))
         data = data[2:]
     array = pa.chunked_array(arrays, type=typ)
+    array.validate()
     result = pickle.loads(pickle.dumps(array))
+    result.validate()
     assert result.equals(array)