You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@orc.apache.org by "wgtmac (via GitHub)" <gi...@apache.org> on 2023/03/28 05:08:04 UTC

[GitHub] [orc] wgtmac commented on a diff in pull request #1454: ORC-1385: [C++] Support schema evolution of numeric types

wgtmac commented on code in PR #1454:
URL: https://github.com/apache/orc/pull/1454#discussion_r1150018545


##########
c++/src/ColumnReader.hh:
##########
@@ -101,6 +103,12 @@ namespace orc {
      * encoded in RLE.
      */
     virtual bool isDecimalAsLong() const = 0;
+
+    /**
+     * @return get schema evolution utility object
+     *

Review Comment:
   remove this line



##########
c++/include/orc/Reader.hh:
##########
@@ -336,6 +336,16 @@ namespace orc {
      * @return if not set, the default is false
      */
     bool getUseTightNumericVector() const;
+
+    /**
+     * Set read type for schema evolution
+     */
+    RowReaderOptions& setReadType(std::shared_ptr<Type>& type);

Review Comment:
   ```suggestion
       RowReaderOptions& setReadType(std::shared_ptr<Type> type);
   ```



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConvertColumnReader.hh"
+
+namespace orc {
+
+  // Assume that we are using tight numeric vector batch
+  using BooleanVectorBatch = ByteVectorBatch;
+
+  ConvertColumnReader::ConvertColumnReader(const Type& _readType, const Type& fileType,
+                                           StripeStreams& stripe)
+      : ColumnReader(_readType, stripe), readType(_readType) {
+    reader = buildReader(fileType, stripe, true, false);
+    data = fileType.createRowBatch(0, memoryPool, false, true);
+  }
+
+  void ConvertColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
+    reader->next(*data, numValues, notNull);
+    rowBatch.resize(data->capacity);
+    rowBatch.numElements = data->numElements;
+    rowBatch.hasNulls = data->hasNulls;
+    if (!rowBatch.hasNulls) {
+      memset(rowBatch.notNull.data(), 1, data->notNull.size());
+    } else {
+      memcpy(rowBatch.notNull.data(), data->notNull.data(), data->notNull.size());
+    }
+  }
+
+  uint64_t ConvertColumnReader::skip(uint64_t numValues) {
+    return reader->skip(numValues);
+  }
+
+  void ConvertColumnReader::seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) {
+    reader->seekToRowGroup(positions);
+  }
+
+  static inline bool canFitInLong(double value) {
+    constexpr double MIN_LONG_AS_DOUBLE = -0x1p63;
+    constexpr double MAX_LONG_AS_DOUBLE_PLUS_ONE = 0x1p63;
+    return ((MIN_LONG_AS_DOUBLE - value < 1.0) && (value < MAX_LONG_AS_DOUBLE_PLUS_ONE));
+  }
+
+  static inline void setNull(ColumnVectorBatch& dstBatch, uint64_t idx) {
+    dstBatch.notNull.data()[idx] = 0;
+    dstBatch.hasNulls = true;
+  }
+
+  // return false if overflow
+  template <typename ReadType>
+  static bool downCastToInteger(ReadType& dstValue, int64_t inputLong) {
+    dstValue = static_cast<ReadType>(inputLong);
+    if (std::is_same<ReadType, int64_t>::value) {
+      return true;
+    }
+    if (static_cast<int64_t>(dstValue) != inputLong) {
+      return false;
+    }
+    return true;
+  }
+
+  // set null if overflow
+  template <typename ReadType, typename FileType>
+  static inline void convertNumericElement(const FileType& srcValue, ReadType& destValue,
+                                           ColumnVectorBatch& destBatch, uint64_t idx) {
+    constexpr bool isFileTypeFloatingPoint(std::is_floating_point<FileType>::value);
+    constexpr bool isReadTypeFloatingPoint(std::is_floating_point<ReadType>::value);
+    int64_t longValue = static_cast<int64_t>(srcValue);
+    if (isFileTypeFloatingPoint) {
+      if (isReadTypeFloatingPoint) {
+        destValue = static_cast<ReadType>(srcValue);
+      } else {
+        if (!canFitInLong(static_cast<double>(srcValue)) ||
+            !downCastToInteger(destValue, longValue)) {
+          setNull(destBatch, idx);
+          return;
+        }
+      }
+    } else {
+      if (isReadTypeFloatingPoint) {
+        destValue = static_cast<ReadType>(srcValue);
+        if (destValue != destValue) {  // check is NaN
+          setNull(destBatch, idx);
+        }
+      } else {
+        if (!downCastToInteger(destValue, static_cast<int64_t>(srcValue))) {
+          setNull(destBatch, idx);
+        }
+      }
+    }
+  }
+
+  // { boolean, byte, short, int, long, float, double } ->
+  // { byte, short, int, long, float, double }
+  template <typename FileTypeBatch, typename ReadTypeBatch, typename ReadType>
+  class NumericConvertColumnReader : public ConvertColumnReader {
+   public:
+    NumericConvertColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe)
+        : ConvertColumnReader(_readType, fileType, stripe) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+      const auto& srcBatch = dynamic_cast<const FileTypeBatch&>(*data);
+      auto& dstBatch = dynamic_cast<ReadTypeBatch&>(rowBatch);
+      if (rowBatch.hasNulls) {
+        for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+          if (rowBatch.notNull[i]) {
+            convertNumericElement<ReadType>(srcBatch.data[i], dstBatch.data[i], rowBatch, i);
+          }
+        }
+      } else {
+        for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+          convertNumericElement<ReadType>(srcBatch.data[i], dstBatch.data[i], rowBatch, i);
+        }
+      }
+    }
+  };
+
+  // { boolean, byte, short, int, long, float, double } -> { boolean }
+  template <typename FileTypeBatch>
+  class NumericConvertColumnReader<FileTypeBatch, BooleanVectorBatch, bool>
+      : public ConvertColumnReader {
+   public:
+    NumericConvertColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe)
+        : ConvertColumnReader(_readType, fileType, stripe) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+      const auto& srcBatch = dynamic_cast<const FileTypeBatch&>(*data);
+      auto& dstBatch = dynamic_cast<BooleanVectorBatch&>(rowBatch);
+      if (rowBatch.hasNulls) {
+        for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+          if (rowBatch.notNull[i]) {
+            dstBatch.data[i] = (static_cast<int64_t>(srcBatch.data[i]) == 0 ? 0 : 1);
+          }
+        }
+      } else {
+        for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+          dstBatch.data[i] = (static_cast<int64_t>(srcBatch.data[i]) == 0 ? 0 : 1);
+        }
+      }
+    }
+  };
+
+#define DEFINE_NUMERIC_CONVERT_READER(FROM, TO, TYPE) \
+  using FROM##To##TO##ColumnReader =                  \
+      NumericConvertColumnReader<FROM##VectorBatch, TO##VectorBatch, TYPE>;
+
+  DEFINE_NUMERIC_CONVERT_READER(Boolean, Byte, int8_t)
+  DEFINE_NUMERIC_CONVERT_READER(Boolean, Short, int16_t)
+  DEFINE_NUMERIC_CONVERT_READER(Boolean, Int, int32_t)
+  DEFINE_NUMERIC_CONVERT_READER(Boolean, Long, int64_t)
+  DEFINE_NUMERIC_CONVERT_READER(Byte, Short, int16_t)
+  DEFINE_NUMERIC_CONVERT_READER(Byte, Int, int32_t)
+  DEFINE_NUMERIC_CONVERT_READER(Byte, Long, int64_t)
+  DEFINE_NUMERIC_CONVERT_READER(Short, Int, int32_t)
+  DEFINE_NUMERIC_CONVERT_READER(Short, Long, int64_t)
+  DEFINE_NUMERIC_CONVERT_READER(Int, Long, int64_t)
+  DEFINE_NUMERIC_CONVERT_READER(Float, Double, double)
+  DEFINE_NUMERIC_CONVERT_READER(Byte, Boolean, bool)
+  DEFINE_NUMERIC_CONVERT_READER(Short, Boolean, bool)
+  DEFINE_NUMERIC_CONVERT_READER(Short, Byte, int8_t)
+  DEFINE_NUMERIC_CONVERT_READER(Int, Boolean, bool)
+  DEFINE_NUMERIC_CONVERT_READER(Int, Byte, int8_t)
+  DEFINE_NUMERIC_CONVERT_READER(Int, Short, int16_t)
+  DEFINE_NUMERIC_CONVERT_READER(Long, Boolean, bool)
+  DEFINE_NUMERIC_CONVERT_READER(Long, Byte, int8_t)
+  DEFINE_NUMERIC_CONVERT_READER(Long, Short, int16_t)
+  DEFINE_NUMERIC_CONVERT_READER(Long, Int, int32_t)
+  DEFINE_NUMERIC_CONVERT_READER(Double, Float, float)
+  // Floating to integer
+  DEFINE_NUMERIC_CONVERT_READER(Float, Boolean, bool)
+  DEFINE_NUMERIC_CONVERT_READER(Float, Byte, int8_t)
+  DEFINE_NUMERIC_CONVERT_READER(Float, Short, int16_t)
+  DEFINE_NUMERIC_CONVERT_READER(Float, Int, int32_t)
+  DEFINE_NUMERIC_CONVERT_READER(Float, Long, int64_t)
+  DEFINE_NUMERIC_CONVERT_READER(Double, Boolean, bool)
+  DEFINE_NUMERIC_CONVERT_READER(Double, Byte, int8_t)
+  DEFINE_NUMERIC_CONVERT_READER(Double, Short, int16_t)
+  DEFINE_NUMERIC_CONVERT_READER(Double, Int, int32_t)
+  DEFINE_NUMERIC_CONVERT_READER(Double, Long, int64_t)
+  // Integer to Floating
+  DEFINE_NUMERIC_CONVERT_READER(Boolean, Float, float)
+  DEFINE_NUMERIC_CONVERT_READER(Byte, Float, float)
+  DEFINE_NUMERIC_CONVERT_READER(Short, Float, float)
+  DEFINE_NUMERIC_CONVERT_READER(Int, Float, float)
+  DEFINE_NUMERIC_CONVERT_READER(Long, Float, float)
+  DEFINE_NUMERIC_CONVERT_READER(Boolean, Double, double)
+  DEFINE_NUMERIC_CONVERT_READER(Byte, Double, double)
+  DEFINE_NUMERIC_CONVERT_READER(Short, Double, double)
+  DEFINE_NUMERIC_CONVERT_READER(Int, Double, double)
+  DEFINE_NUMERIC_CONVERT_READER(Long, Double, double)
+
+#define CASE_CREATE_READER(TYPE, CONVERT) \
+  case TYPE:                              \
+    return std::make_unique<CONVERT##ColumnReader>(_readType, fileType, stripe);
+
+#define CASE_EXCEPTION                                                                 \
+  default:                                                                             \
+    throw SchemaEvolutionError("Cannot convert from " + fileType.toString() + " to " + \
+                               _readType.toString());
+
+  std::unique_ptr<ColumnReader> buildConvertReader(const Type& fileType, StripeStreams& stripe,
+                                                   bool _useTightNumericVector) {
+    if (!_useTightNumericVector) {
+      throw SchemaEvolutionError("Schema Evolution only support tight numeric vector");

Review Comment:
   It would be good to provide clear guidance on which config to set to enable this.



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConvertColumnReader.hh"
+
+namespace orc {
+
+  // Assume that we are using tight numeric vector batch
+  using BooleanVectorBatch = ByteVectorBatch;
+
+  ConvertColumnReader::ConvertColumnReader(const Type& _readType, const Type& fileType,
+                                           StripeStreams& stripe)
+      : ColumnReader(_readType, stripe), readType(_readType) {
+    reader = buildReader(fileType, stripe, true, false);
+    data = fileType.createRowBatch(0, memoryPool, false, true);
+  }
+
+  void ConvertColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
+    reader->next(*data, numValues, notNull);
+    rowBatch.resize(data->capacity);
+    rowBatch.numElements = data->numElements;
+    rowBatch.hasNulls = data->hasNulls;
+    if (!rowBatch.hasNulls) {
+      memset(rowBatch.notNull.data(), 1, data->notNull.size());
+    } else {
+      memcpy(rowBatch.notNull.data(), data->notNull.data(), data->notNull.size());
+    }
+  }
+
+  uint64_t ConvertColumnReader::skip(uint64_t numValues) {
+    return reader->skip(numValues);
+  }
+
+  void ConvertColumnReader::seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) {
+    reader->seekToRowGroup(positions);
+  }
+
+  static inline bool canFitInLong(double value) {
+    constexpr double MIN_LONG_AS_DOUBLE = -0x1p63;
+    constexpr double MAX_LONG_AS_DOUBLE_PLUS_ONE = 0x1p63;
+    return ((MIN_LONG_AS_DOUBLE - value < 1.0) && (value < MAX_LONG_AS_DOUBLE_PLUS_ONE));
+  }
+
+  static inline void setNull(ColumnVectorBatch& dstBatch, uint64_t idx) {
+    dstBatch.notNull.data()[idx] = 0;
+    dstBatch.hasNulls = true;
+  }
+
+  // return false if overflow
+  template <typename ReadType>
+  static bool downCastToInteger(ReadType& dstValue, int64_t inputLong) {
+    dstValue = static_cast<ReadType>(inputLong);
+    if (std::is_same<ReadType, int64_t>::value) {

Review Comment:
   ```suggestion
       if constexpr (std::is_same<ReadType, int64_t>::value) {
   ```



##########
c++/include/orc/Reader.hh:
##########
@@ -336,6 +336,16 @@ namespace orc {
      * @return if not set, the default is false
      */
     bool getUseTightNumericVector() const;
+
+    /**
+     * Set read type for schema evolution
+     */
+    RowReaderOptions& setReadType(std::shared_ptr<Type>& type);

Review Comment:
   In addition, we need to check if the read type matches selected columns. Otherwise we may have problems.



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConvertColumnReader.hh"
+
+namespace orc {
+
+  // Assume that we are using tight numeric vector batch
+  using BooleanVectorBatch = ByteVectorBatch;
+
+  ConvertColumnReader::ConvertColumnReader(const Type& _readType, const Type& fileType,
+                                           StripeStreams& stripe)
+      : ColumnReader(_readType, stripe), readType(_readType) {
+    reader = buildReader(fileType, stripe, true, false);
+    data = fileType.createRowBatch(0, memoryPool, false, true);
+  }
+
+  void ConvertColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
+    reader->next(*data, numValues, notNull);
+    rowBatch.resize(data->capacity);
+    rowBatch.numElements = data->numElements;
+    rowBatch.hasNulls = data->hasNulls;
+    if (!rowBatch.hasNulls) {
+      memset(rowBatch.notNull.data(), 1, data->notNull.size());
+    } else {
+      memcpy(rowBatch.notNull.data(), data->notNull.data(), data->notNull.size());
+    }
+  }
+
+  uint64_t ConvertColumnReader::skip(uint64_t numValues) {
+    return reader->skip(numValues);
+  }
+
+  void ConvertColumnReader::seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) {
+    reader->seekToRowGroup(positions);
+  }
+
+  static inline bool canFitInLong(double value) {
+    constexpr double MIN_LONG_AS_DOUBLE = -0x1p63;
+    constexpr double MAX_LONG_AS_DOUBLE_PLUS_ONE = 0x1p63;
+    return ((MIN_LONG_AS_DOUBLE - value < 1.0) && (value < MAX_LONG_AS_DOUBLE_PLUS_ONE));
+  }
+
+  static inline void setNull(ColumnVectorBatch& dstBatch, uint64_t idx) {
+    dstBatch.notNull.data()[idx] = 0;
+    dstBatch.hasNulls = true;
+  }
+
+  // return false if overflow
+  template <typename ReadType>
+  static bool downCastToInteger(ReadType& dstValue, int64_t inputLong) {
+    dstValue = static_cast<ReadType>(inputLong);
+    if (std::is_same<ReadType, int64_t>::value) {
+      return true;
+    }
+    if (static_cast<int64_t>(dstValue) != inputLong) {
+      return false;
+    }
+    return true;
+  }
+
+  // set null if overflow

Review Comment:
   Should we add a mode to support nullify or throw when overflow is detected? Different engines may expect different behaviors.



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConvertColumnReader.hh"
+
+namespace orc {
+
+  // Assume that we are using tight numeric vector batch
+  using BooleanVectorBatch = ByteVectorBatch;
+
+  ConvertColumnReader::ConvertColumnReader(const Type& _readType, const Type& fileType,
+                                           StripeStreams& stripe)
+      : ColumnReader(_readType, stripe), readType(_readType) {
+    reader = buildReader(fileType, stripe, true, false);

Review Comment:
   It is a good practice add a comment like `/*param_name=*/true` to get better readability.



##########
c++/src/ConvertColumnReader.hh:
##########
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_CONVERT_COLUMN_READER_HH
+#define ORC_CONVERT_COLUMN_READER_HH
+
+#include "ColumnReader.hh"
+#include "SchemaEvolution.hh"
+
+namespace orc {
+
+  class ConvertColumnReader : public ColumnReader {
+   public:
+    ConvertColumnReader(const Type& readType, const Type& fileType, StripeStreams& stripe);
+
+    // override next() to implement convert logic, and inherit nextEncoded(),
+    // nextLazy() and nextLazyEncoded() from ColumnReader. override them only
+    // required.

Review Comment:
   The comment seems not match the code.



##########
c++/include/orc/Reader.hh:
##########
@@ -336,6 +336,16 @@ namespace orc {
      * @return if not set, the default is false
      */
     bool getUseTightNumericVector() const;
+
+    /**
+     * Set read type for schema evolution
+     */
+    RowReaderOptions& setReadType(std::shared_ptr<Type>& type);

Review Comment:
   We can safely use std::move() in the implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org