You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/19 21:43:54 UTC

[GitHub] [arrow] xhochy commented on a change in pull request #8648: ARROW-7906: [C++] [Python] Add ORC write support

xhochy commented on a change in pull request #8648:
URL: https://github.com/apache/arrow/pull/8648#discussion_r560408479



##########
File path: cpp/src/arrow/adapters/orc/adapter.cc
##########
@@ -473,6 +471,107 @@ int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
 
 int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
 
+class ArrowOutputStream : public liborc::OutputStream {
+ public:
+  explicit ArrowOutputStream(const std::shared_ptr<io::OutputStream>& output_stream)
+      : output_stream_(output_stream), length_(0) {}
+
+  uint64_t getLength() const override { return length_; }
+
+  uint64_t getNaturalWriteSize() const override { return 128 * 1024; }

Review comment:
       Where does this constant come from? Should this be stated as a constant somewhere?

##########
File path: cpp/src/arrow/adapters/orc/adapter.cc
##########
@@ -473,6 +471,107 @@ int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
 
 int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
 
+class ArrowOutputStream : public liborc::OutputStream {
+ public:
+  explicit ArrowOutputStream(const std::shared_ptr<io::OutputStream>& output_stream)
+      : output_stream_(output_stream), length_(0) {}
+
+  uint64_t getLength() const override { return length_; }
+
+  uint64_t getNaturalWriteSize() const override { return 128 * 1024; }
+
+  void write(const void* buf, size_t length) override {
+    ORC_THROW_NOT_OK(output_stream_->Write(buf, static_cast<int64_t>(length)));
+    length_ += static_cast<int64_t>(length);
+  }
+
+  const std::string& getName() const override {
+    static const std::string filename("ArrowOutputFile");
+    return filename;
+  }
+
+  void close() override {
+    if (!output_stream_->closed()) {
+      ORC_THROW_NOT_OK(output_stream_->Close());
+    }
+  }
+
+  int64_t get_length() { return length_; }

Review comment:
       ```suggestion
     int64_t length() { return length_; }
   ```

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -316,10 +326,482 @@ Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
   }
 }
 
+template <class array_type, class batch_type>
+Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;

Review comment:
       Always use `{..}`
   ```suggestion
     if (array->null_count() || incomingMask) {
         batch->hasNulls = true;
      }
   ```

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.h
##########
@@ -34,8 +34,21 @@ namespace orc {
 
 Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out);
 
+Status GetORCType(const DataType* type, ORC_UNIQUE_PTR<liborc::Type>* out);
+
+Status GetORCType(const Schema* schema, ORC_UNIQUE_PTR<liborc::Type>* out);

Review comment:
       Input arguments should be by reference.
   ```suggestion
   Status GetORCType(const DataType& type, ORC_UNIQUE_PTR<liborc::Type>* out);
   
   Status GetORCType(const Schema& schema, ORC_UNIQUE_PTR<liborc::Type>* out);
   ```

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.h
##########
@@ -34,8 +34,21 @@ namespace orc {
 
 Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out);
 
+Status GetORCType(const DataType* type, ORC_UNIQUE_PTR<liborc::Type>* out);
+
+Status GetORCType(const Schema* schema, ORC_UNIQUE_PTR<liborc::Type>* out);
+
 Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
                    int64_t offset, int64_t length, ArrayBuilder* builder);
+
+Status FillBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                 int64_t& arrowOffset, int64_t& orcOffset, int64_t length, Array* parray,
+                 std::vector<bool>* incomingMask = NULLPTR);

Review comment:
       Arguments that are also written to should be passed in as pointers.
   ```suggestion
   Status FillBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
                    int64_t* arrowOffset, int64_t* orcOffset, int64_t length, Array* parray,
                    std::vector<bool>* incomingMask = NULLPTR);
   ```

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -40,15 +44,21 @@ namespace orc {
 
 using internal::checked_cast;
 
-// The number of nanoseconds in a second
+// The number of milliseconds, microseconds and nanoseconds in a second
+constexpr int64_t kOneSecondMillis = 1000LL;
+constexpr int64_t kOneMicroNanos = 1000LL;
+constexpr int64_t kOneSecondMicros = 1000000LL;
+constexpr int64_t kOneMilliNanos = 1000000LL;
 constexpr int64_t kOneSecondNanos = 1000000000LL;
+// Jan 1st 2015 in UNIX timestamp
+// constexpr int64_t kConverter = 1420070400LL;
 
 Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
                          int64_t offset, int64_t length, ArrayBuilder* abuilder) {
   auto builder = checked_cast<StructBuilder*>(abuilder);
   auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
 
-  const uint8_t* valid_bytes = nullptr;
+  const uint8_t* valid_bytes = NULLPTR;

Review comment:
       Please keep using `nullptr` in `cc` files, we only use the macro in headers.

##########
File path: cpp/src/arrow/adapters/orc/adapter.cc
##########
@@ -473,6 +471,107 @@ int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
 
 int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
 
+class ArrowOutputStream : public liborc::OutputStream {
+ public:
+  explicit ArrowOutputStream(const std::shared_ptr<io::OutputStream>& output_stream)
+      : output_stream_(output_stream), length_(0) {}
+
+  uint64_t getLength() const override { return length_; }
+
+  uint64_t getNaturalWriteSize() const override { return 128 * 1024; }
+
+  void write(const void* buf, size_t length) override {
+    ORC_THROW_NOT_OK(output_stream_->Write(buf, static_cast<int64_t>(length)));
+    length_ += static_cast<int64_t>(length);
+  }
+
+  const std::string& getName() const override {
+    static const std::string filename("ArrowOutputFile");
+    return filename;
+  }
+
+  void close() override {
+    if (!output_stream_->closed()) {
+      ORC_THROW_NOT_OK(output_stream_->Close());
+    }
+  }
+
+  int64_t get_length() { return length_; }
+
+  void set_length(int64_t length) { length_ = length; }
+
+ private:
+  std::shared_ptr<io::OutputStream> output_stream_;
+  int64_t length_;
+};
+
+class ORCFileWriter::Impl {
+ public:
+  Status Open(const std::shared_ptr<Schema>& schema,
+              const std::shared_ptr<io::OutputStream>& output_stream) {
+    orc_options_ = std::make_shared<liborc::WriterOptions>();
+    outStream_ = ORC_UNIQUE_PTR<liborc::OutputStream>(
+        static_cast<liborc::OutputStream*>(new ArrowOutputStream(output_stream)));
+    ORC_THROW_NOT_OK(GetORCType(schema.get(), &orcSchema_));
+    try {
+      writer_ = createWriter(*orcSchema_, outStream_.get(), *orc_options_);
+    } catch (const liborc::ParseError& e) {
+      return Status::IOError(e.what());
+    }
+    schema_ = schema;
+    num_cols_ = schema->num_fields();
+    return Status::OK();
+  }
+  Status Write(const std::shared_ptr<Table> table) {
+    int64_t numRows = table->num_rows();
+    int64_t batch_size = 1024;  // Doesn't matter what it is
+    std::vector<int64_t> arrowIndexOffset(num_cols_, 0);
+    std::vector<int> arrowChunkOffset(num_cols_, 0);
+    ORC_UNIQUE_PTR<liborc::ColumnVectorBatch> batch = writer_->createRowBatch(batch_size);
+    liborc::StructVectorBatch* root =
+        internal::checked_cast<liborc::StructVectorBatch*>(batch.get());
+    std::vector<liborc::ColumnVectorBatch*> fields = root->fields;
+    while (numRows > 0) {
+      for (int i = 0; i < num_cols_; i++) {
+        ORC_THROW_NOT_OK(adapters::orc::FillBatch(
+            schema_->field(i)->type().get(), fields[i], arrowIndexOffset[i],
+            arrowChunkOffset[i], batch_size, table->column(i).get()));
+      }
+      root->numElements = fields[0]->numElements;
+      writer_->add(*batch);
+      batch->clear();
+      numRows -= batch_size;
+    }
+    writer_->close();
+    return Status::OK();
+  }
+
+ private:
+  ORC_UNIQUE_PTR<liborc::Writer> writer_;
+  std::shared_ptr<liborc::WriterOptions> orc_options_;
+  std::shared_ptr<Schema> schema_;
+  ORC_UNIQUE_PTR<liborc::OutputStream> outStream_;

Review comment:
       Can these be `std::unique_ptr`? You only seem to pass the raw pointer to ORC, not the unique one.

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -316,10 +326,482 @@ Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
   }
 }
 
+template <class array_type, class batch_type>
+Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class batch_type, class target_type>
+Status FillNumericBatchCast(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                            int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                            Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = static_cast<target_type>(array->Value(arrowOffset));
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillDate64Batch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Date64Array*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t miliseconds = array->Value(arrowOffset);
+      batch->data[orcOffset] =
+          static_cast<int64_t>(std::floor(miliseconds / kOneSecondMillis));
+      batch->nanoseconds[orcOffset] =
+          (miliseconds - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos;
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillTimestampBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                          int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                          Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<TimestampArray*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t data = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+      switch (std::static_pointer_cast<TimestampType>(array->type())->unit()) {
+        case TimeUnit::type::SECOND: {
+          batch->data[orcOffset] = data;
+          batch->nanoseconds[orcOffset] = 0;
+          break;
+        }
+        case TimeUnit::type::MILLI: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMillis));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos;
+          break;
+        }
+        case TimeUnit::type::MICRO: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMicros));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMicros * batch->data[orcOffset]) * kOneMicroNanos;
+          break;
+        }
+        default: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondNanos));
+          batch->nanoseconds[orcOffset] = data - kOneSecondNanos * batch->data[orcOffset];
+        }
+      }
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillStringBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,

Review comment:
       Please merge this with `FillBinaryBatch`, the only difference is `GetValue` vs  `GetString`. This should be templateable.

##########
File path: cpp/src/arrow/adapters/orc/adapter.cc
##########
@@ -473,6 +471,107 @@ int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
 
 int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
 
+class ArrowOutputStream : public liborc::OutputStream {
+ public:
+  explicit ArrowOutputStream(const std::shared_ptr<io::OutputStream>& output_stream)
+      : output_stream_(output_stream), length_(0) {}
+
+  uint64_t getLength() const override { return length_; }
+
+  uint64_t getNaturalWriteSize() const override { return 128 * 1024; }
+
+  void write(const void* buf, size_t length) override {
+    ORC_THROW_NOT_OK(output_stream_->Write(buf, static_cast<int64_t>(length)));
+    length_ += static_cast<int64_t>(length);
+  }
+
+  const std::string& getName() const override {
+    static const std::string filename("ArrowOutputFile");
+    return filename;
+  }
+
+  void close() override {
+    if (!output_stream_->closed()) {
+      ORC_THROW_NOT_OK(output_stream_->Close());
+    }
+  }
+
+  int64_t get_length() { return length_; }
+
+  void set_length(int64_t length) { length_ = length; }
+
+ private:
+  std::shared_ptr<io::OutputStream> output_stream_;
+  int64_t length_;
+};
+
+class ORCFileWriter::Impl {
+ public:
+  Status Open(const std::shared_ptr<Schema>& schema,
+              const std::shared_ptr<io::OutputStream>& output_stream) {
+    orc_options_ = std::make_shared<liborc::WriterOptions>();
+    outStream_ = ORC_UNIQUE_PTR<liborc::OutputStream>(
+        static_cast<liborc::OutputStream*>(new ArrowOutputStream(output_stream)));
+    ORC_THROW_NOT_OK(GetORCType(schema.get(), &orcSchema_));
+    try {
+      writer_ = createWriter(*orcSchema_, outStream_.get(), *orc_options_);
+    } catch (const liborc::ParseError& e) {
+      return Status::IOError(e.what());
+    }
+    schema_ = schema;
+    num_cols_ = schema->num_fields();
+    return Status::OK();
+  }

Review comment:
       ```suggestion
     }
     
   ```

##########
File path: cpp/src/arrow/adapters/orc/adapter_test.cc
##########
@@ -157,4 +225,2478 @@ TEST(TestAdapter, readIntAndStringFileMultipleStripes) {
     EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
   }
 }
+
+// WriteORC tests
+
+// General
+TEST(TestAdapterWriteGeneral, writeZeroRows) {
+  std::vector<std::shared_ptr<Field>> xFields{field("bool", boolean()),

Review comment:
       What is the reason that you use slightly different schemas in every test? Maybe you can generalize this to have a reusable test data generation.

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -316,10 +326,482 @@ Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
   }
 }
 
+template <class array_type, class batch_type>
+Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class batch_type, class target_type>
+Status FillNumericBatchCast(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                            int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                            Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = static_cast<target_type>(array->Value(arrowOffset));
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillDate64Batch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Date64Array*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t miliseconds = array->Value(arrowOffset);
+      batch->data[orcOffset] =
+          static_cast<int64_t>(std::floor(miliseconds / kOneSecondMillis));
+      batch->nanoseconds[orcOffset] =
+          (miliseconds - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos;
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillTimestampBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                          int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                          Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<TimestampArray*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t data = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+      switch (std::static_pointer_cast<TimestampType>(array->type())->unit()) {

Review comment:
       Please refactor this to have the switch outside of the look and e.g. have constants set.

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -316,10 +326,482 @@ Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
   }
 }
 
+template <class array_type, class batch_type>
+Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class batch_type, class target_type>
+Status FillNumericBatchCast(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                            int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                            Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = static_cast<target_type>(array->Value(arrowOffset));
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillDate64Batch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Date64Array*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t miliseconds = array->Value(arrowOffset);
+      batch->data[orcOffset] =
+          static_cast<int64_t>(std::floor(miliseconds / kOneSecondMillis));
+      batch->nanoseconds[orcOffset] =
+          (miliseconds - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos;
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillTimestampBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                          int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                          Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<TimestampArray*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t data = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+      switch (std::static_pointer_cast<TimestampType>(array->type())->unit()) {
+        case TimeUnit::type::SECOND: {
+          batch->data[orcOffset] = data;
+          batch->nanoseconds[orcOffset] = 0;
+          break;
+        }
+        case TimeUnit::type::MILLI: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMillis));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos;
+          break;
+        }
+        case TimeUnit::type::MICRO: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMicros));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMicros * batch->data[orcOffset]) * kOneMicroNanos;
+          break;
+        }
+        default: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondNanos));
+          batch->nanoseconds[orcOffset] = data - kOneSecondNanos * batch->data[orcOffset];
+        }
+      }
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillStringBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      std::string dataString = array->GetString(arrowOffset);
+      int dataStringLength = dataString.length();
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataStringLength + 1];  // Include null
+      memcpy(batch->data[orcOffset], dataString.c_str(), dataStringLength + 1);
+      batch->length[orcOffset] = dataStringLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class offset_type>
+Status FillBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      offset_type dataLength = 0;
+      const uint8_t* data = array->GetValue(arrowOffset, &dataLength);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataLength];  // Do not include null
+      memcpy(batch->data[orcOffset], data, dataLength);
+      batch->length[orcOffset] = dataLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                                int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                                Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<FixedSizeBinaryArray*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int32_t byteWidth = array->byte_width();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      const uint8_t* data = array->GetValue(arrowOffset);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[byteWidth];  // Do not include null
+      memcpy(batch->data[orcOffset], data, byteWidth);
+      batch->length[orcOffset] = byteWidth;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+// If Arrow supports 256-bit decimals we can not support it unless ORC does it
+Status FillDecimalBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Decimal128Array*>(parray);
+  auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch);
+  // Arrow uses 128 bits for decimal type and in the future, 256 bits will also be
+  // supported.
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      uint8_t* rawInt128 = const_cast<uint8_t*>(array->GetValue(arrowOffset));
+      uint64_t* lowerBits = reinterpret_cast<uint64_t*>(rawInt128);
+      int64_t* higherBits = reinterpret_cast<int64_t*>(rawInt128 + 8);
+      batch->values[orcOffset] = liborc::Int128(*higherBits, *lowerBits);
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillStructBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<StructArray*>(parray);
+  auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
+  std::shared_ptr<std::vector<bool>> outgoingMask;
+  std::size_t size = type->fields().size();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int64_t initORCOffset = orcOffset;
+  int64_t initArrowOffset = arrowOffset;
+  // First fill fields of ColumnVectorBatch
+  if (array->null_count() || incomingMask) {
+    batch->hasNulls = true;
+    outgoingMask = std::make_shared<std::vector<bool>>(length, true);
+  } else {
+    outgoingMask = NULLPTR;
+  }
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      (*outgoingMask)[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements += orcOffset - initORCOffset;
+  // Fill the fields
+  for (std::size_t i = 0; i < size; i++) {
+    orcOffset = initORCOffset;
+    arrowOffset = initArrowOffset;
+    RETURN_NOT_OK(FillBatch(type->field(i)->type().get(), batch->fields[i], arrowOffset,
+                            orcOffset, length, array->field(i).get(),
+                            outgoingMask.get()));
+  }
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                     int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                     Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] +
+                                      array->value_offset(arrowOffset + 1) -
+                                      array->value_offset(arrowOffset);
+      elementBatch->resize(batch->offsets[orcOffset + 1]);
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                              int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                              Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<FixedSizeListArray*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  int32_t elementLength = array->value_length();  // Fixed length of each subarray
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] + elementLength;
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      elementBatch->resize(subarrayORCLength);
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillMapBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                    int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                    Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<MapArray*>(parray);
+  auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch);
+  auto keyBatch = (batch->keys).get();
+  auto elementBatch = (batch->elements).get();
+  auto keyArray = array->keys().get();
+  auto elementArray = array->items().get();
+  DataType* keyType = keyArray->type().get();
+  DataType* elementType = elementArray->type().get();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  // int64_t initORCOffset = orcOffset, initArrowOffset = arrowOffset;
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] +
+                                      array->value_offset(arrowOffset + 1) -
+                                      array->value_offset(arrowOffset);
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1],
+              initSubarrayArrowOffset = subarrayArrowOffset,
+              initSubarrayORCOffset = subarrayORCOffset;
+      keyBatch->resize(subarrayORCLength);
+      elementBatch->resize(subarrayORCLength);
+      RETURN_NOT_OK(FillBatch(keyType, keyBatch, subarrayArrowOffset, subarrayORCOffset,
+                              subarrayORCLength, keyArray, NULLPTR));
+      subarrayArrowOffset = initSubarrayArrowOffset;
+      subarrayORCOffset = initSubarrayORCOffset;
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, elementArray,
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                 int64_t& arrowOffset, int64_t& orcOffset, int64_t length, Array* parray,
+                 std::vector<bool>* incomingMask) {

Review comment:
       What is the `incomingMask` about? I don't see where it is used.

##########
File path: cpp/src/arrow/adapters/orc/adapter_test.cc
##########
@@ -157,4 +225,2478 @@ TEST(TestAdapter, readIntAndStringFileMultipleStripes) {
     EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
   }
 }
+
+// WriteORC tests
+
+// General
+TEST(TestAdapterWriteGeneral, writeZeroRows) {
+  std::vector<std::shared_ptr<Field>> xFields{field("bool", boolean()),
+                                              field("int8", int8()),
+                                              field("int16", int16()),
+                                              field("int32", int32()),
+                                              field("int64", int64()),
+                                              field("float", float32()),
+                                              field("double", float64()),
+                                              field("decimal128nz", decimal(25, 6)),
+                                              field("decimal128z", decimal(32, 0)),
+                                              field("date32", date32()),
+                                              field("ts3", timestamp(TimeUnit::NANO)),
+                                              field("string", utf8()),
+                                              field("binary", binary())};
+  std::shared_ptr<Schema> sharedPtrSchema = std::make_shared<Schema>(xFields);
+
+  int64_t numRows = 0;
+  int64_t numCols = xFields.size();
+
+  ArrayBuilderVector builders(numCols, NULLPTR);
+  builders[0] =
+      std::static_pointer_cast<ArrayBuilder>(std::make_shared<BooleanBuilder>());
+  builders[1] = std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int8Builder>());
+  builders[2] = std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int16Builder>());
+  builders[3] = std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int32Builder>());
+  builders[4] = std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int64Builder>());
+  builders[5] = std::static_pointer_cast<ArrayBuilder>(std::make_shared<FloatBuilder>());
+  builders[6] = std::static_pointer_cast<ArrayBuilder>(std::make_shared<DoubleBuilder>());
+  builders[7] = std::static_pointer_cast<ArrayBuilder>(
+      std::make_shared<Decimal128Builder>(decimal(25, 6)));
+  builders[8] = std::static_pointer_cast<ArrayBuilder>(
+      std::make_shared<Decimal128Builder>(decimal(32, 0)));
+  builders[9] = std::static_pointer_cast<ArrayBuilder>(std::make_shared<Date32Builder>());
+  builders[10] =
+      std::static_pointer_cast<ArrayBuilder>(std::make_shared<TimestampBuilder>(
+          timestamp(TimeUnit::NANO), default_memory_pool()));
+  builders[11] =
+      std::static_pointer_cast<ArrayBuilder>(std::make_shared<StringBuilder>());
+  builders[12] =
+      std::static_pointer_cast<ArrayBuilder>(std::make_shared<BinaryBuilder>());
+  ArrayVector arrays(numCols, NULLPTR);
+  ChunkedArrayVector cv;
+  cv.reserve(numCols);
+
+  for (int col = 0; col < numCols; col++) {
+    ARROW_EXPECT_OK(builders[col]->Finish(&arrays[col]));
+    cv.push_back(std::make_shared<ChunkedArray>(arrays[col]));
+  }
+
+  std::shared_ptr<Table> table = Table::Make(sharedPtrSchema, cv);
+
+  std::unique_ptr<ORCMemWriter> writer =
+      std::unique_ptr<ORCMemWriter>(new ORCMemWriter());
+  std::unique_ptr<liborc::OutputStream> out_stream =
+      std::unique_ptr<liborc::OutputStream>(static_cast<liborc::OutputStream*>(
+          new MemoryOutputStream(DEFAULT_SMALL_MEM_STREAM_SIZE / 16)));
+  ARROW_EXPECT_OK(writer->Open(sharedPtrSchema, out_stream));
+  ARROW_EXPECT_OK(writer->Write(table));
+  auto output_mem_stream = static_cast<MemoryOutputStream*>(writer->ReleaseOutStream());
+  std::shared_ptr<io::RandomAccessFile> in_stream(
+      new io::BufferReader(std::make_shared<Buffer>(
+          reinterpret_cast<const uint8_t*>(output_mem_stream->getData()),
+          static_cast<int64_t>(output_mem_stream->getLength()))));
+
+  std::unique_ptr<adapters::orc::ORCFileReader> reader;
+  ASSERT_TRUE(
+      adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), &reader).ok());
+  std::shared_ptr<Table> outputTable;
+  ARROW_EXPECT_OK(reader->Read(&outputTable));
+  EXPECT_EQ(outputTable->num_columns(), numCols);
+  EXPECT_EQ(outputTable->num_rows(), numRows);
+  EXPECT_TRUE(outputTable->Equals(*table));
+}
+TEST(TestAdapterWriteGeneral, writeChunkless) {
+  std::vector<std::shared_ptr<Field>> xFieldsSub{std::make_shared<Field>("a", utf8()),
+                                                 std::make_shared<Field>("b", int32())};
+  std::vector<std::shared_ptr<Field>> xFields{
+      field("bool", boolean()),
+      field("int8", int8()),
+      field("int16", int16()),
+      field("int32", int32()),
+      field("int64", int64()),
+      field("float", float32()),
+      field("double", float64()),
+      field("decimal128nz", decimal(25, 6)),
+      field("decimal128z", decimal(32, 0)),
+      field("date32", date32()),
+      field("ts3", timestamp(TimeUnit::NANO)),
+      field("string", utf8()),
+      field("binary", binary()),
+      field("struct", struct_(xFieldsSub)),
+      field("list", list(int32())),
+      field("lsl", list(struct_({field("lsl0", list(int32()))})))};
+  std::shared_ptr<Schema> sharedPtrSchema = std::make_shared<Schema>(xFields);
+
+  int64_t numRows = 0;
+  int64_t numCols = xFields.size();
+
+  ChunkedArrayVector cv;
+  cv.reserve(numCols);
+
+  ArrayMatrix av(numCols, ArrayVector(0, NULLPTR));
+
+  for (int col = 0; col < numCols; col++) {
+    cv.push_back(std::make_shared<ChunkedArray>(av[col], xFields[col]->type()));
+  }
+
+  std::shared_ptr<Table> table = Table::Make(sharedPtrSchema, cv);
+
+  MemoryOutputStream mem_stream(DEFAULT_SMALL_MEM_STREAM_SIZE);
+  std::unique_ptr<ORCMemWriter> writer =
+      std::unique_ptr<ORCMemWriter>(new ORCMemWriter());
+  std::unique_ptr<liborc::OutputStream> out_stream =
+      std::unique_ptr<liborc::OutputStream>(static_cast<liborc::OutputStream*>(
+          new MemoryOutputStream(DEFAULT_SMALL_MEM_STREAM_SIZE / 16)));
+  ARROW_EXPECT_OK(writer->Open(sharedPtrSchema, out_stream));
+  ARROW_EXPECT_OK(writer->Write(table));
+  auto output_mem_stream = static_cast<MemoryOutputStream*>(writer->ReleaseOutStream());
+  std::shared_ptr<io::RandomAccessFile> in_stream(
+      new io::BufferReader(std::make_shared<Buffer>(
+          reinterpret_cast<const uint8_t*>(output_mem_stream->getData()),
+          static_cast<int64_t>(output_mem_stream->getLength()))));
+
+  std::unique_ptr<adapters::orc::ORCFileReader> reader;
+  ASSERT_TRUE(
+      adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), &reader).ok());
+  std::shared_ptr<Table> outputTable;
+  ARROW_EXPECT_OK(reader->Read(&outputTable));
+  EXPECT_EQ(outputTable->num_columns(), numCols);
+  EXPECT_EQ(outputTable->num_rows(), numRows);
+  EXPECT_TRUE(outputTable->Equals(*table));
+}
+TEST(TestAdapterWriteGeneral, writeAllNulls) {
+  std::vector<std::shared_ptr<Field>> xFields{field("bool", boolean()),
+                                              field("int8", int8()),
+                                              field("int16", int16()),
+                                              field("int32", int32()),
+                                              field("int64", int64()),
+                                              field("decimal128nz", decimal(33, 4)),
+                                              field("decimal128z", decimal(35, 0)),
+                                              field("date32", date32()),
+                                              field("ts3", timestamp(TimeUnit::NANO)),
+                                              field("string", utf8()),
+                                              field("binary", binary())};
+  std::shared_ptr<Schema> sharedPtrSchema = std::make_shared<Schema>(xFields);
+
+  int64_t numRows = 10000;
+  int64_t numCols = xFields.size();
+
+  ArrayBuilderMatrix builders(numCols, ArrayBuilderVector(5, NULLPTR));
+
+  for (int i = 0; i < 5; i++) {
+    builders[0][i] =
+        std::static_pointer_cast<ArrayBuilder>(std::make_shared<BooleanBuilder>());
+    builders[1][i] =
+        std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int8Builder>());
+    builders[2][i] =
+        std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int16Builder>());
+    builders[3][i] =
+        std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int32Builder>());
+    builders[4][i] =
+        std::static_pointer_cast<ArrayBuilder>(std::make_shared<Int64Builder>());
+    builders[5][i] = std::static_pointer_cast<ArrayBuilder>(
+        std::make_shared<Decimal128Builder>(decimal(33, 4)));
+    builders[6][i] = std::static_pointer_cast<ArrayBuilder>(
+        std::make_shared<Decimal128Builder>(decimal(35, 0)));
+    builders[7][i] =
+        std::static_pointer_cast<ArrayBuilder>(std::make_shared<Date32Builder>());
+    builders[8][i] =
+        std::static_pointer_cast<ArrayBuilder>(std::make_shared<TimestampBuilder>(
+            timestamp(TimeUnit::NANO), default_memory_pool()));
+    builders[9][i] =
+        std::static_pointer_cast<ArrayBuilder>(std::make_shared<StringBuilder>());
+    builders[10][i] =
+        std::static_pointer_cast<ArrayBuilder>(std::make_shared<BinaryBuilder>());
+  }
+
+  for (int i = 0; i < numRows; i++) {
+    int chunk = i < (numRows / 2) ? 1 : 3;
+    for (int col = 0; col < numCols; col++) {
+      ARROW_EXPECT_OK(builders[col][chunk]->AppendNull());
+    }
+  }
+
+  ArrayMatrix arrays(numCols, ArrayVector(5, NULLPTR));
+  ChunkedArrayVector cv;
+  cv.reserve(numCols);
+
+  for (int col = 0; col < numCols; col++) {
+    for (int i = 0; i < 5; i++) {
+      ARROW_EXPECT_OK(builders[col][i]->Finish(&arrays[col][i]));
+    }
+    cv.push_back(std::make_shared<ChunkedArray>(arrays[col]));
+  }
+
+  std::shared_ptr<Table> table = Table::Make(sharedPtrSchema, cv);

Review comment:
       The code from here until the end of the function seems to be the same for the tests above, please  factor this out into a utility function.

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -425,6 +907,139 @@ Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out) {
   return Status::OK();
 }
 
+Status GetORCType(const DataType* type, ORC_UNIQUE_PTR<liborc::Type>* out) {
+  // Check for NULLPTR
+  if (type == NULLPTR) {

Review comment:
       What's the reason that `NULLPTR` is an allowed input?

##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -316,10 +326,482 @@ Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
   }
 }
 
+template <class array_type, class batch_type>
+Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class batch_type, class target_type>
+Status FillNumericBatchCast(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                            int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                            Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = static_cast<target_type>(array->Value(arrowOffset));
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillDate64Batch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Date64Array*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t miliseconds = array->Value(arrowOffset);
+      batch->data[orcOffset] =
+          static_cast<int64_t>(std::floor(miliseconds / kOneSecondMillis));
+      batch->nanoseconds[orcOffset] =
+          (miliseconds - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos;
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillTimestampBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                          int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                          Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<TimestampArray*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t data = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+      switch (std::static_pointer_cast<TimestampType>(array->type())->unit()) {
+        case TimeUnit::type::SECOND: {
+          batch->data[orcOffset] = data;
+          batch->nanoseconds[orcOffset] = 0;
+          break;
+        }
+        case TimeUnit::type::MILLI: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMillis));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos;
+          break;
+        }
+        case TimeUnit::type::MICRO: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMicros));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMicros * batch->data[orcOffset]) * kOneMicroNanos;
+          break;
+        }
+        default: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondNanos));
+          batch->nanoseconds[orcOffset] = data - kOneSecondNanos * batch->data[orcOffset];
+        }
+      }
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillStringBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      std::string dataString = array->GetString(arrowOffset);
+      int dataStringLength = dataString.length();
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataStringLength + 1];  // Include null
+      memcpy(batch->data[orcOffset], dataString.c_str(), dataStringLength + 1);
+      batch->length[orcOffset] = dataStringLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class offset_type>
+Status FillBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      offset_type dataLength = 0;
+      const uint8_t* data = array->GetValue(arrowOffset, &dataLength);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataLength];  // Do not include null
+      memcpy(batch->data[orcOffset], data, dataLength);
+      batch->length[orcOffset] = dataLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                                int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                                Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<FixedSizeBinaryArray*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int32_t byteWidth = array->byte_width();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      const uint8_t* data = array->GetValue(arrowOffset);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[byteWidth];  // Do not include null
+      memcpy(batch->data[orcOffset], data, byteWidth);
+      batch->length[orcOffset] = byteWidth;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+// If Arrow supports 256-bit decimals we can not support it unless ORC does it
+Status FillDecimalBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Decimal128Array*>(parray);
+  auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch);
+  // Arrow uses 128 bits for decimal type and in the future, 256 bits will also be
+  // supported.
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      uint8_t* rawInt128 = const_cast<uint8_t*>(array->GetValue(arrowOffset));
+      uint64_t* lowerBits = reinterpret_cast<uint64_t*>(rawInt128);
+      int64_t* higherBits = reinterpret_cast<int64_t*>(rawInt128 + 8);
+      batch->values[orcOffset] = liborc::Int128(*higherBits, *lowerBits);
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillStructBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<StructArray*>(parray);
+  auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
+  std::shared_ptr<std::vector<bool>> outgoingMask;
+  std::size_t size = type->fields().size();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int64_t initORCOffset = orcOffset;
+  int64_t initArrowOffset = arrowOffset;
+  // First fill fields of ColumnVectorBatch
+  if (array->null_count() || incomingMask) {
+    batch->hasNulls = true;
+    outgoingMask = std::make_shared<std::vector<bool>>(length, true);
+  } else {
+    outgoingMask = NULLPTR;
+  }
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      (*outgoingMask)[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements += orcOffset - initORCOffset;
+  // Fill the fields
+  for (std::size_t i = 0; i < size; i++) {
+    orcOffset = initORCOffset;
+    arrowOffset = initArrowOffset;
+    RETURN_NOT_OK(FillBatch(type->field(i)->type().get(), batch->fields[i], arrowOffset,
+                            orcOffset, length, array->field(i).get(),
+                            outgoingMask.get()));
+  }
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                     int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                     Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] +
+                                      array->value_offset(arrowOffset + 1) -
+                                      array->value_offset(arrowOffset);
+      elementBatch->resize(batch->offsets[orcOffset + 1]);
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                              int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                              Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<FixedSizeListArray*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  int32_t elementLength = array->value_length();  // Fixed length of each subarray
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] + elementLength;
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      elementBatch->resize(subarrayORCLength);
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillMapBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                    int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                    Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<MapArray*>(parray);
+  auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch);
+  auto keyBatch = (batch->keys).get();

Review comment:
       Don't use `auto` where the type is not visible somewhere else in the line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org