You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by xi...@apache.org on 2023/03/14 06:01:00 UTC

[orc] branch main updated: ORC-1376: [C++] Scaffolding of schema evolution (#1427)

This is an automated email from the ASF dual-hosted git repository.

xinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 011bf28b4 ORC-1376: [C++] Scaffolding of schema evolution (#1427)
011bf28b4 is described below

commit 011bf28b413c9391579701cf475483c8261fe797
Author: ffacs <34...@users.noreply.github.com>
AuthorDate: Tue Mar 14 14:00:51 2023 +0800

    ORC-1376: [C++] Scaffolding of schema evolution (#1427)
    
    ### What changes were proposed in this pull request?
    This pr adds a class schema evolution which checks validity and PDD security of a type conversion. The convertions would be finished later.
    
    ### Why are the changes needed?
    To support schema evolution in c++
    
    ### How was this patch tested?
    UT
    
    This closes #1427
    
    Co-authored-by: wuyuanping <wu...@alibaba-inc.com>
    Co-authored-by: ffacs <ff...@gmail.com>
---
 c++/include/orc/Exceptions.hh |   9 ++
 c++/include/orc/Type.hh       |   6 +
 c++/src/CMakeLists.txt        |   1 +
 c++/src/Exceptions.cc         |  17 +++
 c++/src/Reader.hh             |   1 +
 c++/src/SchemaEvolution.cc    | 256 ++++++++++++++++++++++++++++++++++++++++++
 c++/src/SchemaEvolution.hh    |  46 ++++++++
 c++/src/TypeImpl.cc           |  14 +++
 c++/src/TypeImpl.hh           |   1 +
 c++/src/sargs/SargsApplier.cc |   6 +-
 c++/src/sargs/SargsApplier.hh |   6 +-
 c++/test/TestSargsApplier.cc  |   3 +-
 12 files changed, 363 insertions(+), 3 deletions(-)

diff --git a/c++/include/orc/Exceptions.hh b/c++/include/orc/Exceptions.hh
index 7da8fbee4..0536dbd16 100644
--- a/c++/include/orc/Exceptions.hh
+++ b/c++/include/orc/Exceptions.hh
@@ -58,6 +58,15 @@ namespace orc {
    private:
     InvalidArgument& operator=(const InvalidArgument&);
   };
+
+  class SchemaEvolutionError : public std::logic_error {
+   public:
+    explicit SchemaEvolutionError(const std::string& what_arg);
+    explicit SchemaEvolutionError(const char* what_arg);
+    virtual ~SchemaEvolutionError() noexcept override;
+    SchemaEvolutionError(const SchemaEvolutionError&);
+    SchemaEvolutionError& operator=(const SchemaEvolutionError&) = delete;
+  };
 }  // namespace orc
 
 #endif
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index 59b523f8a..c8ada75c8 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -65,6 +65,12 @@ namespace orc {
     virtual std::vector<std::string> getAttributeKeys() const = 0;
     virtual std::string getAttributeValue(const std::string& key) const = 0;
     virtual std::string toString() const = 0;
+    /**
+     * Get the Type with the given column ID
+     * @param colId the column ID
+     * @return the type corresponding to the column Id, nullptr if not exists
+     */
+    virtual const Type* getTypeByColumnId(uint64_t colId) const = 0;
 
     /**
      * Create a row batch for this type.
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index 048207953..909eed52f 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -179,6 +179,7 @@ set(SOURCE_FILES
   RleDecoderV2.cc
   RleEncoderV2.cc
   RLE.cc
+  SchemaEvolution.cc
   Statistics.cc
   StripeStream.cc
   Timezone.cc
diff --git a/c++/src/Exceptions.cc b/c++/src/Exceptions.cc
index e822aeeb7..23703ff32 100644
--- a/c++/src/Exceptions.cc
+++ b/c++/src/Exceptions.cc
@@ -67,4 +67,21 @@ namespace orc {
   InvalidArgument::~InvalidArgument() noexcept {
     // PASS
   }
+
+  SchemaEvolutionError::SchemaEvolutionError(const std::string& what_arg) : logic_error(what_arg) {
+    // PASS
+  }
+
+  SchemaEvolutionError::SchemaEvolutionError(const char* what_arg) : logic_error(what_arg) {
+    // PASS
+  }
+
+  SchemaEvolutionError::SchemaEvolutionError(const SchemaEvolutionError& error)
+      : logic_error(error) {
+    // PASS
+  }
+
+  SchemaEvolutionError::~SchemaEvolutionError() noexcept {
+    // PASS
+  }
 }  // namespace orc
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index ef04edd6c..ea6db3aad 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -26,6 +26,7 @@
 
 #include "ColumnReader.hh"
 #include "RLE.hh"
+#include "SchemaEvolution.hh"
 #include "TypeImpl.hh"
 #include "sargs/SargsApplier.hh"
 
diff --git a/c++/src/SchemaEvolution.cc b/c++/src/SchemaEvolution.cc
new file mode 100644
index 000000000..f519b8e4b
--- /dev/null
+++ b/c++/src/SchemaEvolution.cc
@@ -0,0 +1,256 @@
+#include "SchemaEvolution.hh"
+#include "orc/Exceptions.hh"
+
+namespace orc {
+
+  SchemaEvolution::SchemaEvolution(const std::shared_ptr<Type>& _readType, const Type* fileType)
+      : readType(_readType) {
+    if (readType) {
+      buildConversion(readType.get(), fileType);
+    } else {
+      for (uint64_t i = 0; i <= fileType->getMaximumColumnId(); ++i) {
+        safePPDConversionMap.insert(i);
+      }
+    }
+  }
+
+  const Type* SchemaEvolution::getReadType(const Type& fileType) const {
+    auto ret = readTypeMap.find(fileType.getColumnId());
+    return ret == readTypeMap.cend() ? &fileType : ret->second;
+  }
+
+  inline void invalidConversion(const Type* readType, const Type* fileType) {
+    throw SchemaEvolutionError("Cannot convert from " + fileType->toString() + " to " +
+                               readType->toString());
+  }
+
+  struct EnumClassHash {
+    template <typename T>
+    std::size_t operator()(T t) const {
+      return static_cast<std::size_t>(t);
+    }
+  };
+
+  // map from file type to read type. it does not contain identity mapping.
+  using TypeSet = std::unordered_set<TypeKind, EnumClassHash>;
+  using ConvertMap = std::unordered_map<TypeKind, TypeSet, EnumClassHash>;
+
+  inline bool supportConversion(const Type& readType, const Type& fileType) {
+    static const ConvertMap& SUPPORTED_CONVERSIONS = *new ConvertMap{
+        // support nothing now
+    };
+    auto iter = SUPPORTED_CONVERSIONS.find(fileType.getKind());
+    if (iter == SUPPORTED_CONVERSIONS.cend()) {
+      return false;
+    }
+    return iter->second.find(readType.getKind()) != iter->second.cend();
+  }
+
+  struct ConversionCheckResult {
+    bool isValid;
+    bool needConvert;
+  };
+
+  ConversionCheckResult checkConversion(const Type& readType, const Type& fileType) {
+    ConversionCheckResult ret = {false, false};
+    if (readType.getKind() == fileType.getKind()) {
+      ret.isValid = true;
+      if (fileType.getKind() == CHAR || fileType.getKind() == VARCHAR) {
+        ret.needConvert = readType.getMaximumLength() < fileType.getMaximumLength();
+      } else if (fileType.getKind() == DECIMAL) {
+        ret.needConvert = readType.getPrecision() != fileType.getPrecision() ||
+                          readType.getScale() != fileType.getScale();
+      }
+    } else {
+      switch (fileType.getKind()) {
+        case BOOLEAN:
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case DECIMAL: {
+          ret.isValid = ret.needConvert =
+              (readType.getKind() != DATE && readType.getKind() != BINARY);
+          break;
+        }
+        case STRING: {
+          ret.isValid = ret.needConvert = true;
+          break;
+        }
+        case CHAR:
+        case VARCHAR: {
+          ret.isValid = true;
+          if (readType.getKind() == STRING) {
+            ret.needConvert = false;
+          } else if (readType.getKind() == CHAR || readType.getKind() == VARCHAR) {
+            ret.needConvert = readType.getMaximumLength() < fileType.getMaximumLength();
+          } else {
+            ret.needConvert = true;
+          }
+          break;
+        }
+        case TIMESTAMP:
+        case TIMESTAMP_INSTANT: {
+          if (readType.getKind() == TIMESTAMP || readType.getKind() == TIMESTAMP_INSTANT) {
+            ret = {true, false};
+          } else {
+            ret.isValid = ret.needConvert = (readType.getKind() != BINARY);
+          }
+          break;
+        }
+        case DATE: {
+          ret.isValid = ret.needConvert =
+              readType.getKind() == STRING || readType.getKind() == CHAR ||
+              readType.getKind() == VARCHAR || readType.getKind() == TIMESTAMP ||
+              readType.getKind() == TIMESTAMP_INSTANT;
+          break;
+        }
+        case BINARY: {
+          ret.isValid = ret.needConvert = readType.getKind() == STRING ||
+                                          readType.getKind() == CHAR ||
+                                          readType.getKind() == VARCHAR;
+          break;
+        }
+        case STRUCT:
+        case LIST:
+        case MAP:
+        case UNION: {
+          ret.isValid = ret.needConvert = false;
+          break;
+        }
+        default:
+          break;
+      }
+    }
+    return ret;
+  }
+
+  void SchemaEvolution::buildConversion(const Type* _readType, const Type* fileType) {
+    if (fileType == nullptr) {
+      throw SchemaEvolutionError("File does not have " + _readType->toString());
+    }
+
+    auto [valid, convert] = checkConversion(*_readType, *fileType);
+    if (!valid) {
+      invalidConversion(_readType, fileType);
+    }
+    readTypeMap.emplace(_readType->getColumnId(), convert ? _readType : fileType);
+
+    // check whether PPD conversion is safe
+    buildSafePPDConversionMap(_readType, fileType);
+
+    for (uint64_t i = 0; i < _readType->getSubtypeCount(); ++i) {
+      auto subType = _readType->getSubtype(i);
+      if (subType) {
+        // null subType means that this is a sub column of map/list type
+        // and it does not exist in the file. simply skip it.
+        buildConversion(subType, fileType->getTypeByColumnId(subType->getColumnId()));
+      }
+    }
+  }
+
+  bool SchemaEvolution::needConvert(const Type& fileType) const {
+    auto _readType = getReadType(fileType);
+    if (_readType == &fileType) {
+      return false;
+    }
+    // it does not check valid here as verified by buildConversion()
+    return checkConversion(*_readType, fileType).needConvert;
+  }
+
+  inline bool isPrimitive(const Type* type) {
+    auto kind = type->getKind();
+    return kind != STRUCT && kind != MAP && kind != LIST && kind != UNION;
+  }
+
+  void SchemaEvolution::buildSafePPDConversionMap(const Type* _readType, const Type* fileType) {
+    if (_readType == nullptr || !isPrimitive(_readType) || fileType == nullptr ||
+        !isPrimitive(fileType)) {
+      return;
+    }
+
+    bool isSafe = false;
+    if (_readType == fileType) {
+      // short cut for same type
+      isSafe = true;
+    } else if (_readType->getKind() == DECIMAL && fileType->getKind() == DECIMAL) {
+      // for decimals alone do equality check to not mess up with precision change
+      if (fileType->getPrecision() == readType->getPrecision() &&
+          fileType->getScale() == readType->getScale()) {
+        isSafe = true;
+      }
+    } else {
+      // only integer and string evolutions are safe
+      // byte -> short -> int -> long
+      // string <-> char <-> varchar
+      // NOTE: Float to double evolution is not safe as floats are stored as
+      // doubles in ORC's internal index, but when doing predicate evaluation
+      // for queries like "select * from orc_float where f = 74.72" the constant
+      // on the filter is converted from string -> double so the precisions will
+      // be different and the comparison will fail.
+      // Soon, we should convert all sargs that compare equality between floats
+      // or doubles to range predicates.
+      // Similarly string -> char and varchar -> char and vice versa is impossible
+      // as ORC stores char with padded spaces in its internal index.
+      switch (fileType->getKind()) {
+        case BYTE: {
+          if (readType->getKind() == SHORT || readType->getKind() == INT ||
+              readType->getKind() == LONG) {
+            isSafe = true;
+          }
+          break;
+        }
+        case SHORT: {
+          if (readType->getKind() == INT || readType->getKind() == LONG) {
+            isSafe = true;
+          }
+          break;
+        }
+        case INT: {
+          if (readType->getKind() == LONG) {
+            isSafe = true;
+          }
+          break;
+        }
+        case STRING: {
+          if (readType->getKind() == VARCHAR) {
+            isSafe = true;
+          }
+          break;
+        }
+        case VARCHAR: {
+          if (readType->getKind() == STRING) {
+            isSafe = true;
+          }
+          break;
+        }
+        case BOOLEAN:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BINARY:
+        case TIMESTAMP:
+        case LIST:
+        case MAP:
+        case STRUCT:
+        case UNION:
+        case DECIMAL:
+        case DATE:
+        case CHAR:
+        case TIMESTAMP_INSTANT:
+          break;
+      }
+    }
+
+    if (isSafe) {
+      safePPDConversionMap.insert(fileType->getColumnId());
+    }
+  }
+
+  bool SchemaEvolution::isSafePPDConversion(uint64_t columnId) const {
+    return safePPDConversionMap.find(columnId) != safePPDConversionMap.cend();
+  }
+
+}  // namespace orc
diff --git a/c++/src/SchemaEvolution.hh b/c++/src/SchemaEvolution.hh
new file mode 100644
index 000000000..57ad55b13
--- /dev/null
+++ b/c++/src/SchemaEvolution.hh
@@ -0,0 +1,46 @@
+#ifndef ORC_SCHEMA_EVOLUTION_HH
+#define ORC_SCHEMA_EVOLUTION_HH
+
+#include "orc/Type.hh"
+
+#include <unordered_map>
+#include <unordered_set>
+
+namespace orc {
+
+  /**
+   * Utility class to compare read type and file type to match their columns
+   * and check type conversion.
+   */
+  class SchemaEvolution {
+   public:
+    SchemaEvolution(const std::shared_ptr<Type>& readType, const Type* fileType);
+
+    // get read type by column id from file type. or return the file type if
+    // read type is not provided (i.e. no schema evolution requested).
+    const Type* getReadType(const Type& fileType) const;
+
+    // check if we need to convert file type to read type for primitive type.
+    bool needConvert(const Type& fileType) const;
+
+    // check if the PPD conversion is safe
+    bool isSafePPDConversion(uint64_t columnId) const;
+
+    // return selected read type
+    const Type* getReadType() const {
+      return readType.get();
+    }
+
+   private:
+    void buildConversion(const Type* readType, const Type* fileType);
+    void buildSafePPDConversionMap(const Type* readType, const Type* fileType);
+
+   private:
+    const std::shared_ptr<Type> readType;
+    std::unordered_map<uint64_t, const Type*> readTypeMap;
+    std::unordered_set<uint64_t> safePPDConversionMap;
+  };
+
+}  // namespace orc
+
+#endif  // ORC_SCHEMA_EVOLUTION_HH
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index c80c5fa6e..4e0f7f268 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -818,4 +818,18 @@ namespace orc {
     return std::make_pair(parseCategory(category, input, pos, nextPos), endPos);
   }
 
+  const Type* TypeImpl::getTypeByColumnId(uint64_t colIdx) const {
+    if (getColumnId() == colIdx) {
+      return this;
+    }
+
+    for (uint64_t i = 0; i != getSubtypeCount(); ++i) {
+      const Type* ret = getSubtype(i)->getTypeByColumnId(colIdx);
+      if (ret != nullptr) {
+        return ret;
+      }
+    }
+    return nullptr;
+  }
+
 }  // namespace orc
diff --git a/c++/src/TypeImpl.hh b/c++/src/TypeImpl.hh
index b76f13642..6d0743793 100644
--- a/c++/src/TypeImpl.hh
+++ b/c++/src/TypeImpl.hh
@@ -88,6 +88,7 @@ namespace orc {
 
     std::string toString() const override;
 
+    const Type* getTypeByColumnId(uint64_t colIdx) const override;
     Type* addStructField(const std::string& fieldName, std::unique_ptr<Type> fieldType) override;
     Type* addUnionChild(std::unique_ptr<Type> fieldType) override;
 
diff --git a/c++/src/sargs/SargsApplier.cc b/c++/src/sargs/SargsApplier.cc
index 1d054c415..2cc3a7cf4 100644
--- a/c++/src/sargs/SargsApplier.cc
+++ b/c++/src/sargs/SargsApplier.cc
@@ -39,9 +39,10 @@ namespace orc {
 
   SargsApplier::SargsApplier(const Type& type, const SearchArgument* searchArgument,
                              uint64_t rowIndexStride, WriterVersion writerVersion,
-                             ReaderMetrics* metrics)
+                             ReaderMetrics* metrics, const SchemaEvolution* schemaEvolution)
       : mType(type),
         mSearchArgument(searchArgument),
+        mSchemaEvolution(schemaEvolution),
         mRowIndexStride(rowIndexStride),
         mWriterVersion(writerVersion),
         mHasEvaluatedFileStats(false),
@@ -88,6 +89,9 @@ namespace orc {
         if (columnIdx == INVALID_COLUMN_ID || rowIndexIter == rowIndexes.cend()) {
           // this column does not exist in current file
           leafValues[pred] = TruthValue::YES_NO_NULL;
+        } else if (mSchemaEvolution && !mSchemaEvolution->isSafePPDConversion(columnIdx)) {
+          // cannot evaluate predicate when ppd is not safe
+          leafValues[pred] = TruthValue::YES_NO_NULL;
         } else {
           // get column statistics
           const proto::ColumnStatistics& statistics =
diff --git a/c++/src/sargs/SargsApplier.hh b/c++/src/sargs/SargsApplier.hh
index e527093e4..73703dcf6 100644
--- a/c++/src/sargs/SargsApplier.hh
+++ b/c++/src/sargs/SargsApplier.hh
@@ -27,6 +27,8 @@
 
 #include "sargs/SearchArgument.hh"
 
+#include "SchemaEvolution.hh"
+
 #include <unordered_map>
 
 namespace orc {
@@ -34,7 +36,8 @@ namespace orc {
   class SargsApplier {
    public:
     SargsApplier(const Type& type, const SearchArgument* searchArgument, uint64_t rowIndexStride,
-                 WriterVersion writerVersion, ReaderMetrics* metrics);
+                 WriterVersion writerVersion, ReaderMetrics* metrics,
+                 const SchemaEvolution* schemaEvolution = nullptr);
 
     /**
      * Evaluate search argument on file statistics
@@ -124,6 +127,7 @@ namespace orc {
    private:
     const Type& mType;
     const SearchArgument* mSearchArgument;
+    const SchemaEvolution* mSchemaEvolution;
     uint64_t mRowIndexStride;
     WriterVersion mWriterVersion;
     // column ids for each predicate leaf in the search argument
diff --git a/c++/test/TestSargsApplier.cc b/c++/test/TestSargsApplier.cc
index dcd19de0d..ebf8f0a64 100644
--- a/c++/test/TestSargsApplier.cc
+++ b/c++/test/TestSargsApplier.cc
@@ -92,7 +92,8 @@ namespace orc {
 
     // evaluate row group index
     ReaderMetrics metrics;
-    SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135, &metrics);
+    SchemaEvolution se(nullptr, type.get());
+    SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135, &metrics, &se);
     EXPECT_TRUE(applier.pickRowGroups(4000, rowIndexes, {}));
     const auto& nextSkippedRows = applier.getNextSkippedRows();
     EXPECT_EQ(4, nextSkippedRows.size());