You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/20 02:19:32 UTC

[GitHub] [arrow] mathyingzhou commented on a change in pull request #8648: ARROW-7906: [C++] [Python] Add ORC write support

mathyingzhou commented on a change in pull request #8648:
URL: https://github.com/apache/arrow/pull/8648#discussion_r560628957



##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -316,10 +326,482 @@ Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
   }
 }
 
+template <class array_type, class batch_type>
+Status FillNumericBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class batch_type, class target_type>
+Status FillNumericBatchCast(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                            int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                            Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<batch_type*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->data[orcOffset] = static_cast<target_type>(array->Value(arrowOffset));
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillDate64Batch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Date64Array*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t miliseconds = array->Value(arrowOffset);
+      batch->data[orcOffset] =
+          static_cast<int64_t>(std::floor(miliseconds / kOneSecondMillis));
+      batch->nanoseconds[orcOffset] =
+          (miliseconds - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos;
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillTimestampBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                          int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                          Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<TimestampArray*>(parray);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      int64_t data = array->Value(arrowOffset);
+      batch->notNull[orcOffset] = true;
+      switch (std::static_pointer_cast<TimestampType>(array->type())->unit()) {
+        case TimeUnit::type::SECOND: {
+          batch->data[orcOffset] = data;
+          batch->nanoseconds[orcOffset] = 0;
+          break;
+        }
+        case TimeUnit::type::MILLI: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMillis));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMillis * batch->data[orcOffset]) * kOneMilliNanos;
+          break;
+        }
+        case TimeUnit::type::MICRO: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondMicros));
+          batch->nanoseconds[orcOffset] =
+              (data - kOneSecondMicros * batch->data[orcOffset]) * kOneMicroNanos;
+          break;
+        }
+        default: {
+          batch->data[orcOffset] =
+              static_cast<int64_t>(std::floor(data / kOneSecondNanos));
+          batch->nanoseconds[orcOffset] = data - kOneSecondNanos * batch->data[orcOffset];
+        }
+      }
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillStringBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      std::string dataString = array->GetString(arrowOffset);
+      int dataStringLength = dataString.length();
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataStringLength + 1];  // Include null
+      memcpy(batch->data[orcOffset], dataString.c_str(), dataStringLength + 1);
+      batch->length[orcOffset] = dataStringLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+template <class array_type, class offset_type>
+Status FillBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      offset_type dataLength = 0;
+      const uint8_t* data = array->GetValue(arrowOffset, &dataLength);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[dataLength];  // Do not include null
+      memcpy(batch->data[orcOffset], data, dataLength);
+      batch->length[orcOffset] = dataLength;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeBinaryBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                                int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                                Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<FixedSizeBinaryArray*>(parray);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int32_t byteWidth = array->byte_width();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      const uint8_t* data = array->GetValue(arrowOffset);
+      if (batch->data[orcOffset]) delete batch->data[orcOffset];
+      batch->data[orcOffset] = new char[byteWidth];  // Do not include null
+      memcpy(batch->data[orcOffset], data, byteWidth);
+      batch->length[orcOffset] = byteWidth;
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+// If Arrow supports 256-bit decimals we can not support it unless ORC does it
+Status FillDecimalBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                        int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                        Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<Decimal128Array*>(parray);
+  auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch);
+  // Arrow uses 128 bits for decimal type and in the future, 256 bits will also be
+  // supported.
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+      uint8_t* rawInt128 = const_cast<uint8_t*>(array->GetValue(arrowOffset));
+      uint64_t* lowerBits = reinterpret_cast<uint64_t*>(rawInt128);
+      int64_t* higherBits = reinterpret_cast<int64_t*>(rawInt128 + 8);
+      batch->values[orcOffset] = liborc::Int128(*higherBits, *lowerBits);
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillStructBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                       Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<StructArray*>(parray);
+  auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
+  std::shared_ptr<std::vector<bool>> outgoingMask;
+  std::size_t size = type->fields().size();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  int64_t initORCOffset = orcOffset;
+  int64_t initArrowOffset = arrowOffset;
+  // First fill fields of ColumnVectorBatch
+  if (array->null_count() || incomingMask) {
+    batch->hasNulls = true;
+    outgoingMask = std::make_shared<std::vector<bool>>(length, true);
+  } else {
+    outgoingMask = NULLPTR;
+  }
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      (*outgoingMask)[orcOffset] = false;
+    } else {
+      batch->notNull[orcOffset] = true;
+    }
+  }
+  batch->numElements += orcOffset - initORCOffset;
+  // Fill the fields
+  for (std::size_t i = 0; i < size; i++) {
+    orcOffset = initORCOffset;
+    arrowOffset = initArrowOffset;
+    RETURN_NOT_OK(FillBatch(type->field(i)->type().get(), batch->fields[i], arrowOffset,
+                            orcOffset, length, array->field(i).get(),
+                            outgoingMask.get()));
+  }
+  return Status::OK();
+}
+
+template <class array_type>
+Status FillListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                     int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                     Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<array_type*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] +
+                                      array->value_offset(arrowOffset + 1) -
+                                      array->value_offset(arrowOffset);
+      elementBatch->resize(batch->offsets[orcOffset + 1]);
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillFixedSizeListBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                              int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                              Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<FixedSizeListArray*>(parray);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  auto elementBatch = (batch->elements).get();
+  DataType* elementType = array->value_type().get();
+  int64_t arrowLength = array->length();
+  int32_t elementLength = array->value_length();  // Fixed length of each subarray
+  if (!arrowLength) return Status::OK();
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] + elementLength;
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1];
+      elementBatch->resize(subarrayORCLength);
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, array->values().get(),
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillMapBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                    int64_t& arrowOffset, int64_t& orcOffset, int64_t length,
+                    Array* parray, std::vector<bool>* incomingMask) {
+  auto array = checked_cast<MapArray*>(parray);
+  auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch);
+  auto keyBatch = (batch->keys).get();
+  auto elementBatch = (batch->elements).get();
+  auto keyArray = array->keys().get();
+  auto elementArray = array->items().get();
+  DataType* keyType = keyArray->type().get();
+  DataType* elementType = elementArray->type().get();
+  int64_t arrowLength = array->length();
+  if (!arrowLength) return Status::OK();
+  // int64_t initORCOffset = orcOffset, initArrowOffset = arrowOffset;
+  if (orcOffset == 0) batch->offsets[0] = 0;
+  if (array->null_count() || incomingMask) batch->hasNulls = true;
+  for (; orcOffset < length && arrowOffset < arrowLength; orcOffset++, arrowOffset++) {
+    if (array->IsNull(arrowOffset) || (incomingMask && !(*incomingMask)[orcOffset])) {
+      batch->notNull[orcOffset] = false;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset];
+    } else {
+      batch->notNull[orcOffset] = true;
+      batch->offsets[orcOffset + 1] = batch->offsets[orcOffset] +
+                                      array->value_offset(arrowOffset + 1) -
+                                      array->value_offset(arrowOffset);
+      int64_t subarrayArrowOffset = array->value_offset(arrowOffset),
+              subarrayORCOffset = batch->offsets[orcOffset],
+              subarrayORCLength = batch->offsets[orcOffset + 1],
+              initSubarrayArrowOffset = subarrayArrowOffset,
+              initSubarrayORCOffset = subarrayORCOffset;
+      keyBatch->resize(subarrayORCLength);
+      elementBatch->resize(subarrayORCLength);
+      RETURN_NOT_OK(FillBatch(keyType, keyBatch, subarrayArrowOffset, subarrayORCOffset,
+                              subarrayORCLength, keyArray, NULLPTR));
+      subarrayArrowOffset = initSubarrayArrowOffset;
+      subarrayORCOffset = initSubarrayORCOffset;
+      RETURN_NOT_OK(FillBatch(elementType, elementBatch, subarrayArrowOffset,
+                              subarrayORCOffset, subarrayORCLength, elementArray,
+                              NULLPTR));
+    }
+  }
+  batch->numElements = orcOffset;
+  return Status::OK();
+}
+
+Status FillBatch(const DataType* type, liborc::ColumnVectorBatch* cbatch,
+                 int64_t& arrowOffset, int64_t& orcOffset, int64_t length, Array* parray,
+                 std::vector<bool>* incomingMask) {

Review comment:
       It is exclusively used by FillStructBatch. The cause is that ORC is much stricter than Arrow in terms of consistency. In this case if a struct scalar is null all its children must be set to null or ORC is not going to function properly. This is why I added incomingMask to pass on null status from a struct to its children.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org