You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/03/30 14:19:56 UTC

arrow git commit: ARROW-698: Add flag to FileWriter::WriteRecordBatch for writing record batches with lengths over INT32_MAX

Repository: arrow
Updated Branches:
  refs/heads/master f7b287a28 -> 642b753a4


ARROW-698: Add flag to FileWriter::WriteRecordBatch for writing record batches with lengths over INT32_MAX

cc @pcmoritz

Author: Wes McKinney <we...@twosigma.com>

Closes #455 from wesm/ARROW-698 and squashes the following commits:

42c100c [Wes McKinney] Add allow_64bit option to FileWriter::WriteRecordBatch


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/642b753a
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/642b753a
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/642b753a

Branch: refs/heads/master
Commit: 642b753a49a3fcb5d53946c773cd70ab2a3ece88
Parents: f7b287a
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Mar 30 10:19:50 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Mar 30 10:19:50 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/ipc/ipc-read-write-test.cc | 20 ++++++++++++--------
 cpp/src/arrow/ipc/writer.cc              | 18 ++++++++++--------
 cpp/src/arrow/ipc/writer.h               |  4 ++--
 cpp/src/arrow/type-test.cc               |  8 ++++----
 cpp/src/arrow/util/visibility.h          |  8 ++++----
 5 files changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index cd3f190..48e546e 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -138,17 +138,21 @@ class IpcTestFixture : public io::MemoryMapFixture {
 
   Status DoLargeRoundTrip(
       const RecordBatch& batch, bool zero_data, std::shared_ptr<RecordBatch>* result) {
-    int32_t metadata_length;
-    int64_t body_length;
-
-    const int64_t buffer_offset = 0;
-
     if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
     RETURN_NOT_OK(mmap_->Seek(0));
 
-    RETURN_NOT_OK(WriteLargeRecordBatch(
-        batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
-    return ReadRecordBatch(batch.schema(), 0, mmap_.get(), result);
+    std::shared_ptr<FileWriter> file_writer;
+    RETURN_NOT_OK(FileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
+    RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true));
+    RETURN_NOT_OK(file_writer->Close());
+
+    int64_t offset;
+    RETURN_NOT_OK(mmap_->Tell(&offset));
+
+    std::shared_ptr<FileReader> file_reader;
+    RETURN_NOT_OK(FileReader::Open(mmap_, offset, &file_reader));
+
+    return file_reader->GetRecordBatch(0, result);
   }
 
   void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index da360f3..92e6194 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -591,7 +591,7 @@ class StreamWriter::StreamWriterImpl {
     return Status::OK();
   }
 
-  Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block) {
+  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit, FileBlock* block) {
     RETURN_NOT_OK(CheckStarted());
 
     block->offset = position_;
@@ -599,7 +599,8 @@ class StreamWriter::StreamWriterImpl {
     // Frame of reference in file format is 0, see ARROW-384
     const int64_t buffer_start_offset = 0;
     RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_,
-        &block->metadata_length, &block->body_length, pool_));
+        &block->metadata_length, &block->body_length, pool_, kMaxNestingDepth,
+        allow_64bit));
     RETURN_NOT_OK(UpdatePosition());
 
     DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes";
@@ -607,10 +608,11 @@ class StreamWriter::StreamWriterImpl {
     return Status::OK();
   }
 
-  Status WriteRecordBatch(const RecordBatch& batch) {
+  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
     // Push an empty FileBlock. Can be written in the footer later
     record_batches_.emplace_back(0, 0, 0);
-    return WriteRecordBatch(batch, &record_batches_[record_batches_.size() - 1]);
+    return WriteRecordBatch(
+        batch, allow_64bit, &record_batches_[record_batches_.size() - 1]);
   }
 
   // Adds padding bytes if necessary to ensure all memory blocks are written on
@@ -657,8 +659,8 @@ StreamWriter::StreamWriter() {
   impl_.reset(new StreamWriterImpl());
 }
 
-Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) {
-  return impl_->WriteRecordBatch(batch);
+Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
+  return impl_->WriteRecordBatch(batch, allow_64bit);
 }
 
 void StreamWriter::set_memory_pool(MemoryPool* pool) {
@@ -723,8 +725,8 @@ Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& s
   return (*out)->impl_->Open(sink, schema);
 }
 
-Status FileWriter::WriteRecordBatch(const RecordBatch& batch) {
-  return impl_->WriteRecordBatch(batch);
+Status FileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
+  return impl_->WriteRecordBatch(batch, allow_64bit);
 }
 
 Status FileWriter::Close() {

http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 3b7e710..25b5ad6 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -87,7 +87,7 @@ class ARROW_EXPORT StreamWriter {
   static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
       std::shared_ptr<StreamWriter>* out);
 
-  virtual Status WriteRecordBatch(const RecordBatch& batch);
+  virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false);
 
   /// Perform any logic necessary to finish the stream. User is responsible for
   /// closing the actual OutputStream
@@ -108,7 +108,7 @@ class ARROW_EXPORT FileWriter : public StreamWriter {
   static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
       std::shared_ptr<FileWriter>* out);
 
-  Status WriteRecordBatch(const RecordBatch& batch) override;
+  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
   Status Close() override;
 
  private:

http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/type-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc
index 7f13f8b..ed86543 100644
--- a/cpp/src/arrow/type-test.cc
+++ b/cpp/src/arrow/type-test.cc
@@ -232,16 +232,16 @@ TEST(TestTimestampType, ToString) {
 }
 
 TEST(TestNestedType, Equals) {
-  auto create_struct =
-      [](std::string inner_name, std::string struct_name) -> shared_ptr<Field> {
+  auto create_struct = [](
+      std::string inner_name, std::string struct_name) -> shared_ptr<Field> {
     auto f_type = field(inner_name, int32());
     vector<shared_ptr<Field>> fields = {f_type};
     auto s_type = std::make_shared<StructType>(fields);
     return field(struct_name, s_type);
   };
 
-  auto create_union =
-      [](std::string inner_name, std::string union_name) -> shared_ptr<Field> {
+  auto create_union = [](
+      std::string inner_name, std::string union_name) -> shared_ptr<Field> {
     auto f_type = field(inner_name, int32());
     vector<shared_ptr<Field>> fields = {f_type};
     vector<uint8_t> codes = {Type::INT32};

http://git-wip-us.apache.org/repos/asf/arrow/blob/642b753a/cpp/src/arrow/util/visibility.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/visibility.h b/cpp/src/arrow/util/visibility.h
index 6382f7f..e84cc45 100644
--- a/cpp/src/arrow/util/visibility.h
+++ b/cpp/src/arrow/util/visibility.h
@@ -39,17 +39,17 @@
 // explicit specializations https://llvm.org/bugs/show_bug.cgi?id=24815
 
 #if defined(__clang__)
-  #define ARROW_EXTERN_TEMPLATE extern template class ARROW_EXPORT
+#define ARROW_EXTERN_TEMPLATE extern template class ARROW_EXPORT
 #else
-  #define ARROW_EXTERN_TEMPLATE extern template class
+#define ARROW_EXTERN_TEMPLATE extern template class
 #endif
 
 // This is a complicated topic, some reading on it:
 // http://www.codesynthesis.com/~boris/blog/2010/01/18/dll-export-cxx-templates/
 #if defined(_MSC_VER)
-  #define ARROW_TEMPLATE_EXPORT ARROW_EXPORT
+#define ARROW_TEMPLATE_EXPORT ARROW_EXPORT
 #else
-  #define ARROW_TEMPLATE_EXPORT
+#define ARROW_TEMPLATE_EXPORT
 #endif
 
 #endif  // ARROW_UTIL_VISIBILITY_H