You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/03/30 14:19:56 UTC
arrow git commit: ARROW-698: Add flag to FileWriter::WriteRecordBatch
for writing record batches with lengths over INT32_MAX
Repository: arrow
Updated Branches:
refs/heads/master f7b287a28 -> 642b753a4
ARROW-698: Add flag to FileWriter::WriteRecordBatch for writing record batches with lengths over INT32_MAX
cc @pcmoritz
Author: Wes McKinney <we...@twosigma.com>
Closes #455 from wesm/ARROW-698 and squashes the following commits:
42c100c [Wes McKinney] Add allow_64bit option to FileWriter::WriteRecordBatch
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/642b753a
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/642b753a
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/642b753a
Branch: refs/heads/master
Commit: 642b753a49a3fcb5d53946c773cd70ab2a3ece88
Parents: f7b287a
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Mar 30 10:19:50 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Mar 30 10:19:50 2017 -0400
----------------------------------------------------------------------
cpp/src/arrow/ipc/ipc-read-write-test.cc | 20 ++++++++++++--------
cpp/src/arrow/ipc/writer.cc | 18 ++++++++++--------
cpp/src/arrow/ipc/writer.h | 4 ++--
cpp/src/arrow/type-test.cc | 8 ++++----
cpp/src/arrow/util/visibility.h | 8 ++++----
5 files changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index cd3f190..48e546e 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -138,17 +138,21 @@ class IpcTestFixture : public io::MemoryMapFixture {
Status DoLargeRoundTrip(
const RecordBatch& batch, bool zero_data, std::shared_ptr<RecordBatch>* result) {
- int32_t metadata_length;
- int64_t body_length;
-
- const int64_t buffer_offset = 0;
-
if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
RETURN_NOT_OK(mmap_->Seek(0));
- RETURN_NOT_OK(WriteLargeRecordBatch(
- batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
- return ReadRecordBatch(batch.schema(), 0, mmap_.get(), result);
+ std::shared_ptr<FileWriter> file_writer;
+ RETURN_NOT_OK(FileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
+ RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true));
+ RETURN_NOT_OK(file_writer->Close());
+
+ int64_t offset;
+ RETURN_NOT_OK(mmap_->Tell(&offset));
+
+ std::shared_ptr<FileReader> file_reader;
+ RETURN_NOT_OK(FileReader::Open(mmap_, offset, &file_reader));
+
+ return file_reader->GetRecordBatch(0, result);
}
void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index da360f3..92e6194 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -591,7 +591,7 @@ class StreamWriter::StreamWriterImpl {
return Status::OK();
}
- Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block) {
+ Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit, FileBlock* block) {
RETURN_NOT_OK(CheckStarted());
block->offset = position_;
@@ -599,7 +599,8 @@ class StreamWriter::StreamWriterImpl {
// Frame of reference in file format is 0, see ARROW-384
const int64_t buffer_start_offset = 0;
RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_,
- &block->metadata_length, &block->body_length, pool_));
+ &block->metadata_length, &block->body_length, pool_, kMaxNestingDepth,
+ allow_64bit));
RETURN_NOT_OK(UpdatePosition());
DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes";
@@ -607,10 +608,11 @@ class StreamWriter::StreamWriterImpl {
return Status::OK();
}
- Status WriteRecordBatch(const RecordBatch& batch) {
+ Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
// Push an empty FileBlock. Can be written in the footer later
record_batches_.emplace_back(0, 0, 0);
- return WriteRecordBatch(batch, &record_batches_[record_batches_.size() - 1]);
+ return WriteRecordBatch(
+ batch, allow_64bit, &record_batches_[record_batches_.size() - 1]);
}
// Adds padding bytes if necessary to ensure all memory blocks are written on
@@ -657,8 +659,8 @@ StreamWriter::StreamWriter() {
impl_.reset(new StreamWriterImpl());
}
-Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) {
- return impl_->WriteRecordBatch(batch);
+Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
+ return impl_->WriteRecordBatch(batch, allow_64bit);
}
void StreamWriter::set_memory_pool(MemoryPool* pool) {
@@ -723,8 +725,8 @@ Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& s
return (*out)->impl_->Open(sink, schema);
}
-Status FileWriter::WriteRecordBatch(const RecordBatch& batch) {
- return impl_->WriteRecordBatch(batch);
+Status FileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
+ return impl_->WriteRecordBatch(batch, allow_64bit);
}
Status FileWriter::Close() {
http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 3b7e710..25b5ad6 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -87,7 +87,7 @@ class ARROW_EXPORT StreamWriter {
static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
std::shared_ptr<StreamWriter>* out);
- virtual Status WriteRecordBatch(const RecordBatch& batch);
+ virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false);
/// Perform any logic necessary to finish the stream. User is responsible for
/// closing the actual OutputStream
@@ -108,7 +108,7 @@ class ARROW_EXPORT FileWriter : public StreamWriter {
static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
std::shared_ptr<FileWriter>* out);
- Status WriteRecordBatch(const RecordBatch& batch) override;
+ Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
Status Close() override;
private:
http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/type-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc
index 7f13f8b..ed86543 100644
--- a/cpp/src/arrow/type-test.cc
+++ b/cpp/src/arrow/type-test.cc
@@ -232,16 +232,16 @@ TEST(TestTimestampType, ToString) {
}
TEST(TestNestedType, Equals) {
- auto create_struct =
- [](std::string inner_name, std::string struct_name) -> shared_ptr<Field> {
+ auto create_struct = [](
+ std::string inner_name, std::string struct_name) -> shared_ptr<Field> {
auto f_type = field(inner_name, int32());
vector<shared_ptr<Field>> fields = {f_type};
auto s_type = std::make_shared<StructType>(fields);
return field(struct_name, s_type);
};
- auto create_union =
- [](std::string inner_name, std::string union_name) -> shared_ptr<Field> {
+ auto create_union = [](
+ std::string inner_name, std::string union_name) -> shared_ptr<Field> {
auto f_type = field(inner_name, int32());
vector<shared_ptr<Field>> fields = {f_type};
vector<uint8_t> codes = {Type::INT32};
http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/util/visibility.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/visibility.h b/cpp/src/arrow/util/visibility.h
index 6382f7f..e84cc45 100644
--- a/cpp/src/arrow/util/visibility.h
+++ b/cpp/src/arrow/util/visibility.h
@@ -39,17 +39,17 @@
// explicit specializations https://llvm.org/bugs/show_bug.cgi?id=24815
#if defined(__clang__)
- #define ARROW_EXTERN_TEMPLATE extern template class ARROW_EXPORT
+#define ARROW_EXTERN_TEMPLATE extern template class ARROW_EXPORT
#else
- #define ARROW_EXTERN_TEMPLATE extern template class
+#define ARROW_EXTERN_TEMPLATE extern template class
#endif
// This is a complicated topic, some reading on it:
// http://www.codesynthesis.com/~boris/blog/2010/01/18/dll-export-cxx-templates/
#if defined(_MSC_VER)
- #define ARROW_TEMPLATE_EXPORT ARROW_EXPORT
+#define ARROW_TEMPLATE_EXPORT ARROW_EXPORT
#else
- #define ARROW_TEMPLATE_EXPORT
+#define ARROW_TEMPLATE_EXPORT
#endif
#endif // ARROW_UTIL_VISIBILITY_H