You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/08/12 12:06:15 UTC
[arrow] branch master updated: ARROW-5977: [C++] [Python] Allow
specifying which columns to include
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 93688e8 ARROW-5977: [C++] [Python] Allow specifying which columns to include
93688e8 is described below
commit 93688e8c1fa2f22d46394c548a9edbd3d2d7c62d
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Aug 12 14:05:54 2019 +0200
ARROW-5977: [C++] [Python] Allow specifying which columns to include
If the `include_columns` option is not empty:
- columns are included in that order
- columns not in `include_columns` are ignored
- columns in `include_columns` but not in the CSV file produce a null column
Closes #5026 from pitrou/ARROW-5977-csv-include-columns and squashes the following commits:
0f198ef0b <Antoine Pitrou> ARROW-5977: Allow specifying which columns to include
Authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/csv/column-builder-test.cc | 135 ++++++++++++++++++++++++++-----
cpp/src/arrow/csv/column-builder.cc | 92 +++++++++++++++++++--
cpp/src/arrow/csv/column-builder.h | 12 ++-
cpp/src/arrow/csv/options.h | 13 +++
cpp/src/arrow/csv/parser.cc | 6 +-
cpp/src/arrow/csv/reader.cc | 118 ++++++++++++++++++++-------
cpp/src/arrow/csv/reader.h | 1 -
cpp/src/arrow/table.cc | 15 +++-
python/pyarrow/_csv.pyx | 54 ++++++++++++-
python/pyarrow/error.pxi | 4 +-
python/pyarrow/includes/libarrow.pxd | 2 +
python/pyarrow/table.pxi | 7 ++
python/pyarrow/tests/test_csv.py | 104 +++++++++++++++++++++++-
python/pyarrow/tests/test_table.py | 4 +
14 files changed, 498 insertions(+), 69 deletions(-)
diff --git a/cpp/src/arrow/csv/column-builder-test.cc b/cpp/src/arrow/csv/column-builder-test.cc
index f2c39aa..62e8d06 100644
--- a/cpp/src/arrow/csv/column-builder-test.cc
+++ b/cpp/src/arrow/csv/column-builder-test.cc
@@ -24,6 +24,7 @@
#include "arrow/csv/column-builder.h"
#include "arrow/csv/options.h"
#include "arrow/csv/test-common.h"
+#include "arrow/memory_pool.h"
#include "arrow/table.h"
#include "arrow/testing/util.h"
#include "arrow/type.h"
@@ -48,6 +49,74 @@ void AssertBuilding(const std::shared_ptr<ColumnBuilder>& builder,
}
ASSERT_OK(builder->task_group()->Finish());
ASSERT_OK(builder->Finish(out));
+ ASSERT_OK((*out)->Validate());
+}
+
+//////////////////////////////////////////////////////////////////////////
+// Tests for null column builder
+
+TEST(NullColumnBuilder, Empty) {
+ std::shared_ptr<DataType> type = null();
+ auto tg = TaskGroup::MakeSerial();
+
+ std::shared_ptr<ColumnBuilder> builder;
+ ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder));
+
+ std::shared_ptr<ChunkedArray> actual;
+ AssertBuilding(builder, {}, &actual);
+
+ ChunkedArray expected({}, type);
+ AssertChunkedEqual(*actual, expected);
+}
+
+TEST(NullColumnBuilder, InsertNull) {
+ // Bulding a column of nulls with type null()
+ std::shared_ptr<DataType> type = null();
+ auto tg = TaskGroup::MakeSerial();
+
+ std::shared_ptr<ColumnBuilder> builder;
+ ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder));
+
+ std::shared_ptr<BlockParser> parser;
+ std::shared_ptr<ChunkedArray> actual, expected;
+ // Those values are indifferent, only the number of rows is used
+ MakeColumnParser({"456", "789"}, &parser);
+ builder->Insert(1, parser);
+ MakeColumnParser({"123"}, &parser);
+ builder->Insert(0, parser);
+ ASSERT_OK(builder->task_group()->Finish());
+ ASSERT_OK(builder->Finish(&actual));
+ ASSERT_OK(actual->Validate());
+
+ auto chunks =
+ ArrayVector{std::make_shared<NullArray>(1), std::make_shared<NullArray>(2)};
+ expected = std::make_shared<ChunkedArray>(chunks);
+ AssertChunkedEqual(*actual, *expected);
+}
+
+TEST(NullColumnBuilder, InsertTyped) {
+ // Bulding a column of nulls with another type
+ std::shared_ptr<DataType> type = int16();
+ auto tg = TaskGroup::MakeSerial();
+
+ std::shared_ptr<ColumnBuilder> builder;
+ ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder));
+
+ std::shared_ptr<BlockParser> parser;
+ std::shared_ptr<ChunkedArray> actual, expected;
+ // Those values are indifferent, only the number of rows is used
+ MakeColumnParser({"abc", "def", "ghi"}, &parser);
+ builder->Insert(1, parser);
+ MakeColumnParser({"jkl"}, &parser);
+ builder->Insert(0, parser);
+ ASSERT_OK(builder->task_group()->Finish());
+ ASSERT_OK(builder->Finish(&actual));
+ ASSERT_OK(actual->Validate());
+
+ auto chunks = ArrayVector{ArrayFromJSON(type, "[null]"),
+ ArrayFromJSON(type, "[null, null, null]")};
+ expected = std::make_shared<ChunkedArray>(chunks);
+ AssertChunkedEqual(*actual, *expected);
}
//////////////////////////////////////////////////////////////////////////
@@ -56,7 +125,8 @@ void AssertBuilding(const std::shared_ptr<ColumnBuilder>& builder,
TEST(ColumnBuilder, Empty) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+ ConvertOptions::Defaults(), tg, &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {}, &actual);
@@ -68,7 +138,8 @@ TEST(ColumnBuilder, Empty) {
TEST(ColumnBuilder, Basics) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+ ConvertOptions::Defaults(), tg, &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"123", "-456"}}, &actual);
@@ -82,7 +153,8 @@ TEST(ColumnBuilder, Insert) {
// Test ColumnBuilder::Insert()
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+ ConvertOptions::Defaults(), tg, &builder));
std::shared_ptr<BlockParser> parser;
std::shared_ptr<ChunkedArray> actual, expected;
@@ -92,6 +164,7 @@ TEST(ColumnBuilder, Insert) {
builder->Insert(0, parser);
ASSERT_OK(builder->task_group()->Finish());
ASSERT_OK(builder->Finish(&actual));
+ ASSERT_OK(actual->Validate());
ChunkedArrayFromVector<Int32Type>({{123}, {456}}, &expected);
AssertChunkedEqual(*actual, *expected);
@@ -100,7 +173,8 @@ TEST(ColumnBuilder, Insert) {
TEST(ColumnBuilder, MultipleChunks) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+ ConvertOptions::Defaults(), tg, &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"1", "2", "3"}, {"4", "5"}}, &actual);
@@ -113,7 +187,8 @@ TEST(ColumnBuilder, MultipleChunks) {
TEST(ColumnBuilder, MultipleChunksParallel) {
auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool());
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0,
+ ConvertOptions::Defaults(), tg, &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual);
@@ -129,7 +204,8 @@ TEST(ColumnBuilder, MultipleChunksParallel) {
TEST(InferringColumnBuilder, Empty) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {}, &actual);
@@ -141,7 +217,8 @@ TEST(InferringColumnBuilder, Empty) {
TEST(InferringColumnBuilder, SingleChunkNull) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "NA"}}, &actual);
@@ -153,7 +230,8 @@ TEST(InferringColumnBuilder, SingleChunkNull) {
TEST(InferringColumnBuilder, MultipleChunkNull) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "NA"}, {""}, {"NaN"}}, &actual);
@@ -165,7 +243,8 @@ TEST(InferringColumnBuilder, MultipleChunkNull) {
TEST(InferringColumnBuilder, SingleChunkInteger) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "123", "456"}}, &actual);
@@ -178,7 +257,8 @@ TEST(InferringColumnBuilder, SingleChunkInteger) {
TEST(InferringColumnBuilder, MultipleChunkInteger) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"NA", "123", "456"}}, &actual);
@@ -192,7 +272,8 @@ TEST(InferringColumnBuilder, MultipleChunkInteger) {
TEST(InferringColumnBuilder, SingleChunkBoolean) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "0", "FALSE"}}, &actual);
@@ -206,7 +287,8 @@ TEST(InferringColumnBuilder, SingleChunkBoolean) {
TEST(InferringColumnBuilder, MultipleChunkBoolean) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"1", "True", "0"}}, &actual);
@@ -220,7 +302,8 @@ TEST(InferringColumnBuilder, MultipleChunkBoolean) {
TEST(InferringColumnBuilder, SingleChunkReal) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "0.0", "12.5"}}, &actual);
@@ -234,7 +317,8 @@ TEST(InferringColumnBuilder, SingleChunkReal) {
TEST(InferringColumnBuilder, MultipleChunkReal) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"008"}, {"NaN", "12.5"}}, &actual);
@@ -248,7 +332,8 @@ TEST(InferringColumnBuilder, MultipleChunkReal) {
TEST(InferringColumnBuilder, SingleChunkTimestamp) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"", "1970-01-01", "2018-11-13 17:11:10"}}, &actual);
@@ -263,7 +348,8 @@ TEST(InferringColumnBuilder, SingleChunkTimestamp) {
TEST(InferringColumnBuilder, MultipleChunkTimestamp) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"1970-01-01"}, {"2018-11-13 17:11:10"}}, &actual);
@@ -282,7 +368,8 @@ TEST(InferringColumnBuilder, SingleChunkString) {
std::shared_ptr<ChunkedArray> expected;
// With valid UTF8
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
AssertBuilding(builder, {{"", "foo", "baré"}}, &actual);
ChunkedArrayFromVector<StringType, std::string>({{true, true, true}},
@@ -293,7 +380,7 @@ TEST(InferringColumnBuilder, SingleChunkString) {
auto options = ConvertOptions::Defaults();
options.check_utf8 = false;
tg = TaskGroup::MakeSerial();
- ASSERT_OK(ColumnBuilder::Make(0, options, tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder));
AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual);
ChunkedArrayFromVector<StringType, std::string>({{true, true, true}},
@@ -308,7 +395,8 @@ TEST(InferringColumnBuilder, SingleChunkBinary) {
std::shared_ptr<ChunkedArray> expected;
// With invalid UTF8, checking
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual);
ChunkedArrayFromVector<BinaryType, std::string>({{true, true, true}},
@@ -319,7 +407,8 @@ TEST(InferringColumnBuilder, SingleChunkBinary) {
TEST(InferringColumnBuilder, MultipleChunkString) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré"}}, &actual);
@@ -333,7 +422,8 @@ TEST(InferringColumnBuilder, MultipleChunkString) {
TEST(InferringColumnBuilder, MultipleChunkBinary) {
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré\xff"}}, &actual);
@@ -350,7 +440,8 @@ TEST(InferringColumnBuilder, MultipleChunkBinary) {
TEST(InferringColumnBuilder, MultipleChunkIntegerParallel) {
auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool());
std::shared_ptr<ColumnBuilder> builder;
- ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder));
+ ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg,
+ &builder));
std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual);
diff --git a/cpp/src/arrow/csv/column-builder.cc b/cpp/src/arrow/csv/column-builder.cc
index cfc36fe..eff0088 100644
--- a/cpp/src/arrow/csv/column-builder.cc
+++ b/cpp/src/arrow/csv/column-builder.cc
@@ -25,9 +25,11 @@
#include <vector>
#include "arrow/array.h"
+#include "arrow/builder.h"
#include "arrow/csv/column-builder.h"
#include "arrow/csv/converter.h"
#include "arrow/csv/options.h"
+#include "arrow/csv/parser.h"
#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/table.h"
@@ -51,6 +53,73 @@ void ColumnBuilder::Append(const std::shared_ptr<BlockParser>& parser) {
}
//////////////////////////////////////////////////////////////////////////
+// Null column builder implementation (for a column not in the CSV file)
+
+class NullColumnBuilder : public ColumnBuilder {
+ public:
+ explicit NullColumnBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool,
+ const std::shared_ptr<internal::TaskGroup>& task_group)
+ : ColumnBuilder(task_group), type_(type), pool_(pool) {}
+
+ Status Init();
+
+ void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override;
+ Status Finish(std::shared_ptr<ChunkedArray>* out) override;
+
+ // While NullColumnBuilder is so cheap that it doesn't need parallelization
+ // in itself, the CSV reader doesn't know this and can still call it from
+ // multiple threads, so use a mutex anyway.
+ std::mutex mutex_;
+
+ std::shared_ptr<DataType> type_;
+ MemoryPool* pool_;
+ std::unique_ptr<ArrayBuilder> builder_;
+};
+
+Status NullColumnBuilder::Init() { return MakeBuilder(pool_, type_, &builder_); }
+
+void NullColumnBuilder::Insert(int64_t block_index,
+ const std::shared_ptr<BlockParser>& parser) {
+ // Create a null Array pointer at the back at the list.
+ size_t chunk_index = static_cast<size_t>(block_index);
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (chunks_.size() <= chunk_index) {
+ chunks_.resize(chunk_index + 1);
+ }
+ }
+
+ // Spawn a task that will build an array of nulls with the right DataType
+ const int32_t num_rows = parser->num_rows();
+ DCHECK_GE(num_rows, 0);
+
+ task_group_->Append([=]() -> Status {
+ std::shared_ptr<Array> res;
+ RETURN_NOT_OK(builder_->AppendNulls(num_rows));
+ RETURN_NOT_OK(builder_->Finish(&res));
+
+ std::lock_guard<std::mutex> lock(mutex_);
+ // Should not insert an already built chunk
+ DCHECK_EQ(chunks_[chunk_index], nullptr);
+ chunks_[chunk_index] = std::move(res);
+ return Status::OK();
+ });
+}
+
+Status NullColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) {
+ // Unnecessary iff all tasks have finished
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ for (const auto& chunk : chunks_) {
+ if (chunk == nullptr) {
+ return Status::Invalid("a chunk failed allocating for an unknown reason");
+ }
+ }
+ *out = std::make_shared<ChunkedArray>(chunks_, type_);
+ return Status::OK();
+}
+
+//////////////////////////////////////////////////////////////////////////
// Pre-typed column builder implementation
class TypedColumnBuilder : public ColumnBuilder {
@@ -355,28 +424,37 @@ Status InferringColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) {
//////////////////////////////////////////////////////////////////////////
// Factory functions
-Status ColumnBuilder::Make(const std::shared_ptr<DataType>& type, int32_t col_index,
- const ConvertOptions& options,
+Status ColumnBuilder::Make(MemoryPool* pool, const std::shared_ptr<DataType>& type,
+ int32_t col_index, const ConvertOptions& options,
const std::shared_ptr<TaskGroup>& task_group,
std::shared_ptr<ColumnBuilder>* out) {
- auto ptr =
- new TypedColumnBuilder(type, col_index, options, default_memory_pool(), task_group);
+ auto ptr = new TypedColumnBuilder(type, col_index, options, pool, task_group);
auto res = std::shared_ptr<ColumnBuilder>(ptr);
RETURN_NOT_OK(ptr->Init());
*out = res;
return Status::OK();
}
-Status ColumnBuilder::Make(int32_t col_index, const ConvertOptions& options,
+Status ColumnBuilder::Make(MemoryPool* pool, int32_t col_index,
+ const ConvertOptions& options,
const std::shared_ptr<TaskGroup>& task_group,
std::shared_ptr<ColumnBuilder>* out) {
- auto ptr =
- new InferringColumnBuilder(col_index, options, default_memory_pool(), task_group);
+ // XXX
+ auto ptr = new InferringColumnBuilder(col_index, options, pool, task_group);
auto res = std::shared_ptr<ColumnBuilder>(ptr);
RETURN_NOT_OK(ptr->Init());
*out = res;
return Status::OK();
}
+Status ColumnBuilder::MakeNull(MemoryPool* pool, const std::shared_ptr<DataType>& type,
+ const std::shared_ptr<internal::TaskGroup>& task_group,
+ std::shared_ptr<ColumnBuilder>* out) {
+ auto res = std::make_shared<NullColumnBuilder>(type, pool, task_group);
+ RETURN_NOT_OK(res->Init());
+ *out = std::move(res);
+ return Status::OK();
+}
+
} // namespace csv
} // namespace arrow
diff --git a/cpp/src/arrow/csv/column-builder.h b/cpp/src/arrow/csv/column-builder.h
index 054a642..789e7b9 100644
--- a/cpp/src/arrow/csv/column-builder.h
+++ b/cpp/src/arrow/csv/column-builder.h
@@ -63,16 +63,22 @@ class ARROW_EXPORT ColumnBuilder {
std::shared_ptr<internal::TaskGroup> task_group() { return task_group_; }
/// Construct a strictly-typed ColumnBuilder.
- static Status Make(const std::shared_ptr<DataType>& type, int32_t col_index,
- const ConvertOptions& options,
+ static Status Make(MemoryPool* pool, const std::shared_ptr<DataType>& type,
+ int32_t col_index, const ConvertOptions& options,
const std::shared_ptr<internal::TaskGroup>& task_group,
std::shared_ptr<ColumnBuilder>* out);
/// Construct a type-inferring ColumnBuilder.
- static Status Make(int32_t col_index, const ConvertOptions& options,
+ static Status Make(MemoryPool* pool, int32_t col_index, const ConvertOptions& options,
const std::shared_ptr<internal::TaskGroup>& task_group,
std::shared_ptr<ColumnBuilder>* out);
+ /// Construct a ColumnBuilder for a column of nulls
+ /// (i.e. not present in the CSV file).
+ static Status MakeNull(MemoryPool* pool, const std::shared_ptr<DataType>& type,
+ const std::shared_ptr<internal::TaskGroup>& task_group,
+ std::shared_ptr<ColumnBuilder>* out);
+
protected:
explicit ColumnBuilder(const std::shared_ptr<internal::TaskGroup>& task_group)
: task_group_(task_group) {}
diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h
index 21d0ab2..846c2b8 100644
--- a/cpp/src/arrow/csv/options.h
+++ b/cpp/src/arrow/csv/options.h
@@ -73,6 +73,19 @@ struct ARROW_EXPORT ConvertOptions {
// If false, then all strings are valid string values.
bool strings_can_be_null = false;
+ // XXX Should we have a separate FilterOptions?
+
+ // If non-empty, indicates the names of columns from the CSV file that should
+ // be actually read and converted (in the vector's order).
+ // Columns not in this vector will be ignored.
+ std::vector<std::string> include_columns;
+ // If false, columns in `include_columns` but not in the CSV file will error out.
+ // If true, columns in `include_columns` but not in the CSV file will produce
+ // a column of nulls (whose type is selected using `column_types`,
+ // or null by default)
+ // This option is ignored if `include_columns` is empty.
+ bool include_missing_columns = false;
+
static ConvertOptions Defaults();
};
diff --git a/cpp/src/arrow/csv/parser.cc b/cpp/src/arrow/csv/parser.cc
index d6454ed..8d085dc 100644
--- a/cpp/src/arrow/csv/parser.cc
+++ b/cpp/src/arrow/csv/parser.cc
@@ -513,7 +513,11 @@ Status BlockParser::ParseFinal(const char* data, uint32_t size, uint32_t* out_si
BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols,
int32_t max_num_rows)
- : pool_(pool), options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {}
+ : pool_(pool),
+ options_(options),
+ num_rows_(-1),
+ num_cols_(num_cols),
+ max_num_rows_(max_num_rows) {}
BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows)
: BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {}
diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index 1ef2d39..1bccc9b 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -163,7 +163,7 @@ class BaseTableReader : public csv::TableReader {
if (read_options_.column_names.empty()) {
// Read one row with column names
- BlockParser parser(pool_, parse_options_, num_cols_, 1);
+ BlockParser parser(pool_, parse_options_, num_csv_cols_, 1);
uint32_t parsed_size = 0;
RETURN_NOT_OK(parser.Parse(reinterpret_cast<const char*>(cur_data_),
static_cast<uint32_t>(cur_size_), &parsed_size));
@@ -189,27 +189,88 @@ class BaseTableReader : public csv::TableReader {
column_names_ = read_options_.column_names;
}
- num_cols_ = static_cast<int32_t>(column_names_.size());
- DCHECK_GT(num_cols_, 0);
+ num_csv_cols_ = static_cast<int32_t>(column_names_.size());
+ DCHECK_GT(num_csv_cols_, 0);
- // Construct column builders
- for (int32_t col_index = 0; col_index < num_cols_; ++col_index) {
+ if (convert_options_.include_columns.empty()) {
+ return MakeColumnBuilders();
+ } else {
+ return MakeColumnBuilders(convert_options_.include_columns);
+ }
+ }
+
+ // Make column builders, assuming inclusion of all columns in CSV file order
+ Status MakeColumnBuilders() {
+ for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) {
std::shared_ptr<ColumnBuilder> builder;
- // Does the named column have a fixed type?
- auto it = convert_options_.column_types.find(column_names_[col_index]);
- if (it == convert_options_.column_types.end()) {
- RETURN_NOT_OK(
- ColumnBuilder::Make(col_index, convert_options_, task_group_, &builder));
+ const auto& col_name = column_names_[col_index];
+
+ RETURN_NOT_OK(MakeCSVColumnBuilder(col_name, col_index, &builder));
+ column_builders_.push_back(builder);
+ builder_names_.push_back(col_name);
+ }
+ return Status::OK();
+ }
+
+ // Make column builders, assuming inclusion of columns in `include_columns` order
+ Status MakeColumnBuilders(const std::vector<std::string>& include_columns) {
+ // Compute indices of columns in the CSV file
+ std::unordered_map<std::string, int32_t> col_indices;
+ col_indices.reserve(column_names_.size());
+ for (int32_t i = 0; i < static_cast<int32_t>(column_names_.size()); ++i) {
+ col_indices.emplace(column_names_[i], i);
+ }
+
+ // For each column name in include_columns, build the corresponding ColumnBuilder
+ for (const auto& col_name : include_columns) {
+ std::shared_ptr<ColumnBuilder> builder;
+ auto it = col_indices.find(col_name);
+ if (it != col_indices.end()) {
+ auto col_index = it->second;
+ RETURN_NOT_OK(MakeCSVColumnBuilder(col_name, col_index, &builder));
} else {
- RETURN_NOT_OK(ColumnBuilder::Make(it->second, col_index, convert_options_,
- task_group_, &builder));
+ // Column not in the CSV file
+ if (convert_options_.include_missing_columns) {
+ RETURN_NOT_OK(MakeNullColumnBuilder(col_name, &builder));
+ } else {
+ return Status::KeyError("Column '", col_name,
+ "' in include_columns "
+ "does not exist in CSV file");
+ }
}
column_builders_.push_back(builder);
+ builder_names_.push_back(col_name);
}
-
return Status::OK();
}
+ // Make a column builder for the given CSV column name and index
+ Status MakeCSVColumnBuilder(const std::string& col_name, int32_t col_index,
+ std::shared_ptr<ColumnBuilder>* out) {
+ // Does the named column have a fixed type?
+ auto it = convert_options_.column_types.find(col_name);
+ if (it == convert_options_.column_types.end()) {
+ return ColumnBuilder::Make(pool_, col_index, convert_options_, task_group_, out);
+ } else {
+ return ColumnBuilder::Make(pool_, it->second, col_index, convert_options_,
+ task_group_, out);
+ }
+ }
+
+ // Make a column builder for a column of nulls
+ Status MakeNullColumnBuilder(const std::string& col_name,
+ std::shared_ptr<ColumnBuilder>* out) {
+ std::shared_ptr<DataType> type;
+ // If the named column have a fixed type, use it, otherwise use null()
+ auto it = convert_options_.column_types.find(col_name);
+ if (it != convert_options_.column_types.end()) {
+ type = it->second;
+ } else {
+ type = null();
+ }
+ return ColumnBuilder::MakeNull(pool_, type, task_group_, out);
+ }
+
// Trigger conversion of parsed block data
Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) {
for (auto& builder : column_builders_) {
@@ -219,17 +280,15 @@ class BaseTableReader : public csv::TableReader {
}
Status MakeTable(std::shared_ptr<Table>* out) {
- DCHECK_GT(num_cols_, 0);
- DCHECK_EQ(column_names_.size(), static_cast<uint32_t>(num_cols_));
- DCHECK_EQ(column_builders_.size(), static_cast<uint32_t>(num_cols_));
+ DCHECK_EQ(column_builders_.size(), builder_names_.size());
std::vector<std::shared_ptr<Field>> fields;
std::vector<std::shared_ptr<ChunkedArray>> columns;
- for (int32_t i = 0; i < num_cols_; ++i) {
+ for (int32_t i = 0; i < static_cast<int32_t>(builder_names_.size()); ++i) {
std::shared_ptr<ChunkedArray> array;
RETURN_NOT_OK(column_builders_[i]->Finish(&array));
- fields.push_back(::arrow::field(column_names_[i], array->type()));
+ fields.push_back(::arrow::field(builder_names_[i], array->type()));
columns.emplace_back(std::move(array));
}
*out = Table::Make(schema(fields), columns);
@@ -241,12 +300,17 @@ class BaseTableReader : public csv::TableReader {
ParseOptions parse_options_;
ConvertOptions convert_options_;
- int32_t num_cols_ = -1;
- std::shared_ptr<ReadaheadSpooler> readahead_;
- // Column names
+ // Number of columns in the CSV file
+ int32_t num_csv_cols_ = -1;
+ // Column names in the CSV file
std::vector<std::string> column_names_;
- std::shared_ptr<internal::TaskGroup> task_group_;
+ // Column builders for target Table (not necessarily in CSV file order)
std::vector<std::shared_ptr<ColumnBuilder>> column_builders_;
+ // Names of columns, in same order as column_builders_
+ std::vector<std::string> builder_names_;
+
+ std::shared_ptr<ReadaheadSpooler> readahead_;
+ std::shared_ptr<internal::TaskGroup> task_group_;
// Current block and data pointer
std::shared_ptr<Buffer> cur_block_;
@@ -289,7 +353,7 @@ class SerialTableReader : public BaseTableReader {
static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
auto parser =
- std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
+ std::make_shared<BlockParser>(pool_, parse_options_, num_csv_cols_, max_num_rows);
while (!eof_) {
// Consume current block
uint32_t parsed_size = 0;
@@ -376,8 +440,8 @@ class ThreadedTableReader : public BaseTableReader {
// "mutable" allows to modify captured by-copy chunk_buffer
task_group_->Append([=]() mutable -> Status {
- auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_cols_,
- max_num_rows);
+ auto parser = std::make_shared<BlockParser>(pool_, parse_options_,
+ num_csv_cols_, max_num_rows);
uint32_t parsed_size = 0;
RETURN_NOT_OK(parser->Parse(reinterpret_cast<const char*>(chunk_data),
chunk_size, &parsed_size));
@@ -409,8 +473,8 @@ class ThreadedTableReader : public BaseTableReader {
for (auto& builder : column_builders_) {
builder->SetTaskGroup(task_group_);
}
- auto parser =
- std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
+ auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_csv_cols_,
+ max_num_rows);
uint32_t parsed_size = 0;
RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast<const char*>(cur_data_),
static_cast<uint32_t>(cur_size_), &parsed_size));
diff --git a/cpp/src/arrow/csv/reader.h b/cpp/src/arrow/csv/reader.h
index edf6f11..53255f9 100644
--- a/cpp/src/arrow/csv/reader.h
+++ b/cpp/src/arrow/csv/reader.h
@@ -41,7 +41,6 @@ class ARROW_EXPORT TableReader {
virtual Status Read(std::shared_ptr<Table>* out) = 0;
- // XXX pass optional schema?
static Status Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
const ReadOptions&, const ParseOptions&, const ConvertOptions&,
std::shared_ptr<TableReader>* out);
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 446010f..3f68561 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -21,6 +21,7 @@
#include <cstdlib>
#include <limits>
#include <memory>
+#include <sstream>
#include <utility>
#include "arrow/array.h"
@@ -184,10 +185,12 @@ Status ChunkedArray::Validate() const {
}
const auto& type = *chunks_[0]->type();
+ // Make sure chunks all have the same type
for (size_t i = 1; i < chunks_.size(); ++i) {
- if (!chunks_[i]->type()->Equals(type)) {
+ const Array& chunk = *chunks_[i];
+ if (!chunk.type()->Equals(type)) {
return Status::Invalid("In chunk ", i, " expected type ", type.ToString(),
- " but saw ", chunks_[i]->type()->ToString());
+ " but saw ", chunk.type()->ToString());
}
}
return Status::OK();
@@ -343,7 +346,7 @@ class SimpleTable : public Table {
}
}
- // Make sure columns are all the same length
+ // Make sure columns are all the same length, and validate them
for (int i = 0; i < num_columns(); ++i) {
const ChunkedArray* col = columns_[i].get();
if (col->length() != num_rows_) {
@@ -351,6 +354,12 @@ class SimpleTable : public Table {
" expected length ", num_rows_, " but got length ",
col->length());
}
+ Status st = col->Validate();
+ if (!st.ok()) {
+ std::stringstream ss;
+ ss << "Column " << i << ": " << st.message();
+ return st.WithMessage(ss.str());
+ }
}
return Status::OK();
}
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index 93e9cb3..cb5fe18 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -55,7 +55,7 @@ cdef class ReadOptions:
The number of rows to skip at the start of the CSV data, not
including the row of column names (if any).
column_names: list, optional
- The Table column names. If empty, column names will be
+ The column names in the CSV file. If empty, column names will be
read from the first row after `skip_rows`.
"""
cdef:
@@ -115,7 +115,7 @@ cdef class ReadOptions:
@property
def column_names(self):
"""
- The Table column names. If empty, column names will be
+ The column names in the CSV file. If empty, column names will be
read from the first row after `skip_rows`.
"""
return [frombytes(s) for s in self.options.column_names]
@@ -288,6 +288,17 @@ cdef class ConvertOptions:
If true, then strings in null_values are considered null for
string columns.
If false, then all strings are valid string values.
+ include_columns: list, optional
+ The names of columns to include in the Table.
+ If empty, the Table will include all columns from the CSV file.
+ If not empty, only these columns will be included, in this order.
+ include_missing_columns: bool, optional (default False)
+ If false, columns in `include_columns` but not in the CSV file will
+ error out.
+ If true, columns in `include_columns` but not in the CSV file will
+ produce a column of nulls (whose type is selected using
+ `column_types`, or null by default).
+ This option is ignored if `include_columns` is empty.
"""
cdef:
CCSVConvertOptions options
@@ -297,7 +308,8 @@ cdef class ConvertOptions:
def __init__(self, check_utf8=None, column_types=None, null_values=None,
true_values=None, false_values=None,
- strings_can_be_null=None):
+ strings_can_be_null=None, include_columns=None,
+ include_missing_columns=None):
self.options = CCSVConvertOptions.Defaults()
if check_utf8 is not None:
self.check_utf8 = check_utf8
@@ -311,6 +323,10 @@ cdef class ConvertOptions:
self.false_values = false_values
if strings_can_be_null is not None:
self.strings_can_be_null = strings_can_be_null
+ if include_columns is not None:
+ self.include_columns = include_columns
+ if include_missing_columns is not None:
+ self.include_missing_columns = include_missing_columns
@property
def check_utf8(self):
@@ -396,6 +412,38 @@ cdef class ConvertOptions:
def false_values(self, value):
self.options.false_values = [tobytes(x) for x in value]
+ @property
+ def include_columns(self):
+ """
+ The names of columns to include in the Table.
+
+ If empty, the Table will include all columns from the CSV file.
+ If not empty, only these columns will be included, in this order.
+ """
+ return [frombytes(s) for s in self.options.include_columns]
+
+ @include_columns.setter
+ def include_columns(self, value):
+ self.options.include_columns.clear()
+ for item in value:
+ self.options.include_columns.push_back(tobytes(item))
+
+ @property
+ def include_missing_columns(self):
+ """
+ If false, columns in `include_columns` but not in the CSV file will
+ error out.
+ If true, columns in `include_columns` but not in the CSV file will
+ produce a null column (whose type is selected using `column_types`,
+ or null by default).
+ This option is ignored if `include_columns` is empty.
+ """
+ return self.options.include_missing_columns
+
+ @include_missing_columns.setter
+ def include_missing_columns(self, value):
+ self.options.include_missing_columns = value
+
cdef _get_reader(input_file, shared_ptr[InputStream]* out):
use_memory_map = False
diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi
index 3cb9142..6c64f9b 100644
--- a/python/pyarrow/error.pxi
+++ b/python/pyarrow/error.pxi
@@ -37,7 +37,9 @@ class ArrowIOError(IOError, ArrowException):
class ArrowKeyError(KeyError, ArrowException):
- pass
+ def __str__(self):
+ # Override KeyError.__str__, as it uses the repr() of the key
+ return ArrowException.__str__(self)
class ArrowTypeError(TypeError, ArrowException):
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index ad0fa09..9c2aa6a 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1113,6 +1113,8 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil:
vector[c_string] true_values
vector[c_string] false_values
c_bool strings_can_be_null
+ vector[c_string] include_columns
+ c_bool include_missing_columns
@staticmethod
CCSVConvertOptions Defaults()
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index f103d26..fad7ab4 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -78,6 +78,13 @@ cdef class ChunkedArray(_PandasConvertible):
def __str__(self):
return self.format()
+ def validate(self):
+ """
+ Validate chunked array consistency.
+ """
+ with nogil:
+ check_status(self.sp_chunked_array.get().Validate())
+
@property
def null_count(self):
"""
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index a42237e..543f659 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -176,15 +176,27 @@ def test_convert_options():
opts.false_values = ['xxx', 'yyy']
assert opts.false_values == ['xxx', 'yyy']
+ assert opts.include_columns == []
+ opts.include_columns = ['def', 'abc']
+ assert opts.include_columns == ['def', 'abc']
+
+ assert opts.include_missing_columns is False
+ opts.include_missing_columns = True
+ assert opts.include_missing_columns is True
+
opts = cls(check_utf8=False, column_types={'a': pa.null()},
null_values=['N', 'nn'], true_values=['T', 'tt'],
- false_values=['F', 'ff'], strings_can_be_null=True)
+ false_values=['F', 'ff'], strings_can_be_null=True,
+ include_columns=['abc', 'def'],
+ include_missing_columns=True)
assert opts.check_utf8 is False
assert opts.column_types == {'a': pa.null()}
assert opts.null_values == ['N', 'nn']
assert opts.false_values == ['F', 'ff']
assert opts.true_values == ['T', 'tt']
assert opts.strings_can_be_null is True
+ assert opts.include_columns == ['abc', 'def']
+ assert opts.include_missing_columns is True
class BaseTestCSVRead:
@@ -306,6 +318,80 @@ class BaseTestCSVRead:
"y": ["kl", "op"],
}
+ def test_include_columns(self):
+ rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
+
+ convert_options = ConvertOptions()
+ convert_options.include_columns = ['ab']
+ table = self.read_bytes(rows, convert_options=convert_options)
+ self.check_names(table, ["ab"])
+ assert table.to_pydict() == {
+ "ab": ["ef", "ij", "mn"],
+ }
+
+ # Order of include_columns is respected, regardless of CSV order
+ convert_options.include_columns = ['cd', 'ab']
+ table = self.read_bytes(rows, convert_options=convert_options)
+ schema = pa.schema([('cd', pa.string()),
+ ('ab', pa.string())])
+ assert table.schema == schema
+ assert table.to_pydict() == {
+ "cd": ["gh", "kl", "op"],
+ "ab": ["ef", "ij", "mn"],
+ }
+
+ # Include a column not in the CSV file => raises by default
+ convert_options.include_columns = ['xx', 'ab', 'yy']
+ with pytest.raises(KeyError,
+ match="Column 'xx' in include_columns "
+ "does not exist in CSV file"):
+ self.read_bytes(rows, convert_options=convert_options)
+
+ def test_include_missing_columns(self):
+ rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
+
+ read_options = ReadOptions()
+ convert_options = ConvertOptions()
+ convert_options.include_columns = ['xx', 'ab', 'yy']
+ convert_options.include_missing_columns = True
+ table = self.read_bytes(rows, read_options=read_options,
+ convert_options=convert_options)
+ schema = pa.schema([('xx', pa.null()),
+ ('ab', pa.string()),
+ ('yy', pa.null())])
+ assert table.schema == schema
+ assert table.to_pydict() == {
+ "xx": [None, None, None],
+ "ab": ["ef", "ij", "mn"],
+ "yy": [None, None, None],
+ }
+
+ # Combining with `column_names`
+ read_options.column_names = ["xx", "yy"]
+ convert_options.include_columns = ["yy", "cd"]
+ table = self.read_bytes(rows, read_options=read_options,
+ convert_options=convert_options)
+ schema = pa.schema([('yy', pa.string()),
+ ('cd', pa.null())])
+ assert table.schema == schema
+ assert table.to_pydict() == {
+ "yy": ["cd", "gh", "kl", "op"],
+ "cd": [None, None, None, None],
+ }
+
+ # And with `column_types` as well
+ convert_options.column_types = {"yy": pa.binary(),
+ "cd": pa.int32()}
+ table = self.read_bytes(rows, read_options=read_options,
+ convert_options=convert_options)
+ schema = pa.schema([('yy', pa.binary()),
+ ('cd', pa.int32())])
+ assert table.schema == schema
+ assert table.to_pydict() == {
+ "yy": [b"cd", b"gh", b"kl", b"op"],
+ "cd": [None, None, None, None],
+ }
+
def test_simple_ints(self):
# Infer integer columns
rows = b"a,b,c\n1,2,3\n4,5,6\n"
@@ -471,6 +557,22 @@ class BaseTestCSVRead:
assert "In CSV column #1: " in err
assert "CSV conversion error to float: invalid value 'XXX'" in err
+ def test_column_types_with_column_names(self):
+ # When both `column_names` and `column_types` are given, names
+ # in `column_types` should refer to names in `column_names`
+ rows = b"a,b\nc,d\ne,f\n"
+ read_options = ReadOptions(column_names=['x', 'y'])
+ convert_options = ConvertOptions(column_types={'x': pa.binary()})
+ table = self.read_bytes(rows, read_options=read_options,
+ convert_options=convert_options)
+ schema = pa.schema([('x', pa.binary()),
+ ('y', pa.string())])
+ assert table.schema == schema
+ assert table.to_pydict() == {
+ 'x': [b'a', b'c', b'e'],
+ 'y': ['b', 'd', 'f'],
+ }
+
def test_no_ending_newline(self):
# No \n after last line
rows = b"a,b,c\n1,2,3\n4,5,6"
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 8014c19..64a9a08 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -30,6 +30,7 @@ def test_chunked_array_basics():
data = pa.chunked_array([], type=pa.string())
assert data.type == pa.string()
assert data.to_pylist() == []
+ data.validate()
with pytest.raises(ValueError):
pa.chunked_array([])
@@ -43,6 +44,7 @@ def test_chunked_array_basics():
assert all(isinstance(c, pa.lib.Int64Array) for c in data.chunks)
assert all(isinstance(c, pa.lib.Int64Array) for c in data.iterchunks())
assert len(data.chunks) == 3
+ data.validate()
def test_chunked_array_mismatch_types():
@@ -177,7 +179,9 @@ def test_chunked_array_pickle(data, typ):
arrays.append(pa.array(data[:2], type=typ))
data = data[2:]
array = pa.chunked_array(arrays, type=typ)
+ array.validate()
result = pickle.loads(pickle.dumps(array))
+ result.validate()
assert result.equals(array)