You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by xi...@apache.org on 2023/03/14 06:01:00 UTC
[orc] branch main updated: ORC-1376: [C++] Scaffolding of schema evolution (#1427)
This is an automated email from the ASF dual-hosted git repository.
xinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 011bf28b4 ORC-1376: [C++] Scaffolding of schema evolution (#1427)
011bf28b4 is described below
commit 011bf28b413c9391579701cf475483c8261fe797
Author: ffacs <34...@users.noreply.github.com>
AuthorDate: Tue Mar 14 14:00:51 2023 +0800
ORC-1376: [C++] Scaffolding of schema evolution (#1427)
### What changes were proposed in this pull request?
This pr adds a class schema evolution which checks validity and PDD security of a type conversion. The convertions would be finished later.
### Why are the changes needed?
To support schema evolution in c++
### How was this patch tested?
UT
This closes #1427
Co-authored-by: wuyuanping <wu...@alibaba-inc.com>
Co-authored-by: ffacs <ff...@gmail.com>
---
c++/include/orc/Exceptions.hh | 9 ++
c++/include/orc/Type.hh | 6 +
c++/src/CMakeLists.txt | 1 +
c++/src/Exceptions.cc | 17 +++
c++/src/Reader.hh | 1 +
c++/src/SchemaEvolution.cc | 256 ++++++++++++++++++++++++++++++++++++++++++
c++/src/SchemaEvolution.hh | 46 ++++++++
c++/src/TypeImpl.cc | 14 +++
c++/src/TypeImpl.hh | 1 +
c++/src/sargs/SargsApplier.cc | 6 +-
c++/src/sargs/SargsApplier.hh | 6 +-
c++/test/TestSargsApplier.cc | 3 +-
12 files changed, 363 insertions(+), 3 deletions(-)
diff --git a/c++/include/orc/Exceptions.hh b/c++/include/orc/Exceptions.hh
index 7da8fbee4..0536dbd16 100644
--- a/c++/include/orc/Exceptions.hh
+++ b/c++/include/orc/Exceptions.hh
@@ -58,6 +58,15 @@ namespace orc {
private:
InvalidArgument& operator=(const InvalidArgument&);
};
+
+ class SchemaEvolutionError : public std::logic_error {
+ public:
+ explicit SchemaEvolutionError(const std::string& what_arg);
+ explicit SchemaEvolutionError(const char* what_arg);
+ virtual ~SchemaEvolutionError() noexcept override;
+ SchemaEvolutionError(const SchemaEvolutionError&);
+ SchemaEvolutionError& operator=(const SchemaEvolutionError&) = delete;
+ };
} // namespace orc
#endif
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index 59b523f8a..c8ada75c8 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -65,6 +65,12 @@ namespace orc {
virtual std::vector<std::string> getAttributeKeys() const = 0;
virtual std::string getAttributeValue(const std::string& key) const = 0;
virtual std::string toString() const = 0;
+ /**
+ * Get the Type with the given column ID
+ * @param colId the column ID
+ * @return the type corresponding to the column Id, nullptr if not exists
+ */
+ virtual const Type* getTypeByColumnId(uint64_t colId) const = 0;
/**
* Create a row batch for this type.
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index 048207953..909eed52f 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -179,6 +179,7 @@ set(SOURCE_FILES
RleDecoderV2.cc
RleEncoderV2.cc
RLE.cc
+ SchemaEvolution.cc
Statistics.cc
StripeStream.cc
Timezone.cc
diff --git a/c++/src/Exceptions.cc b/c++/src/Exceptions.cc
index e822aeeb7..23703ff32 100644
--- a/c++/src/Exceptions.cc
+++ b/c++/src/Exceptions.cc
@@ -67,4 +67,21 @@ namespace orc {
InvalidArgument::~InvalidArgument() noexcept {
// PASS
}
+
+ SchemaEvolutionError::SchemaEvolutionError(const std::string& what_arg) : logic_error(what_arg) {
+ // PASS
+ }
+
+ SchemaEvolutionError::SchemaEvolutionError(const char* what_arg) : logic_error(what_arg) {
+ // PASS
+ }
+
+ SchemaEvolutionError::SchemaEvolutionError(const SchemaEvolutionError& error)
+ : logic_error(error) {
+ // PASS
+ }
+
+ SchemaEvolutionError::~SchemaEvolutionError() noexcept {
+ // PASS
+ }
} // namespace orc
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index ef04edd6c..ea6db3aad 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -26,6 +26,7 @@
#include "ColumnReader.hh"
#include "RLE.hh"
+#include "SchemaEvolution.hh"
#include "TypeImpl.hh"
#include "sargs/SargsApplier.hh"
diff --git a/c++/src/SchemaEvolution.cc b/c++/src/SchemaEvolution.cc
new file mode 100644
index 000000000..f519b8e4b
--- /dev/null
+++ b/c++/src/SchemaEvolution.cc
@@ -0,0 +1,256 @@
+#include "SchemaEvolution.hh"
+#include "orc/Exceptions.hh"
+
+namespace orc {
+
+ SchemaEvolution::SchemaEvolution(const std::shared_ptr<Type>& _readType, const Type* fileType)
+ : readType(_readType) {
+ if (readType) {
+ buildConversion(readType.get(), fileType);
+ } else {
+ for (uint64_t i = 0; i <= fileType->getMaximumColumnId(); ++i) {
+ safePPDConversionMap.insert(i);
+ }
+ }
+ }
+
+ const Type* SchemaEvolution::getReadType(const Type& fileType) const {
+ auto ret = readTypeMap.find(fileType.getColumnId());
+ return ret == readTypeMap.cend() ? &fileType : ret->second;
+ }
+
+ inline void invalidConversion(const Type* readType, const Type* fileType) {
+ throw SchemaEvolutionError("Cannot convert from " + fileType->toString() + " to " +
+ readType->toString());
+ }
+
+ struct EnumClassHash {
+ template <typename T>
+ std::size_t operator()(T t) const {
+ return static_cast<std::size_t>(t);
+ }
+ };
+
+ // map from file type to read type. it does not contain identity mapping.
+ using TypeSet = std::unordered_set<TypeKind, EnumClassHash>;
+ using ConvertMap = std::unordered_map<TypeKind, TypeSet, EnumClassHash>;
+
+ inline bool supportConversion(const Type& readType, const Type& fileType) {
+ static const ConvertMap& SUPPORTED_CONVERSIONS = *new ConvertMap{
+ // support nothing now
+ };
+ auto iter = SUPPORTED_CONVERSIONS.find(fileType.getKind());
+ if (iter == SUPPORTED_CONVERSIONS.cend()) {
+ return false;
+ }
+ return iter->second.find(readType.getKind()) != iter->second.cend();
+ }
+
+ struct ConversionCheckResult {
+ bool isValid;
+ bool needConvert;
+ };
+
+ ConversionCheckResult checkConversion(const Type& readType, const Type& fileType) {
+ ConversionCheckResult ret = {false, false};
+ if (readType.getKind() == fileType.getKind()) {
+ ret.isValid = true;
+ if (fileType.getKind() == CHAR || fileType.getKind() == VARCHAR) {
+ ret.needConvert = readType.getMaximumLength() < fileType.getMaximumLength();
+ } else if (fileType.getKind() == DECIMAL) {
+ ret.needConvert = readType.getPrecision() != fileType.getPrecision() ||
+ readType.getScale() != fileType.getScale();
+ }
+ } else {
+ switch (fileType.getKind()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL: {
+ ret.isValid = ret.needConvert =
+ (readType.getKind() != DATE && readType.getKind() != BINARY);
+ break;
+ }
+ case STRING: {
+ ret.isValid = ret.needConvert = true;
+ break;
+ }
+ case CHAR:
+ case VARCHAR: {
+ ret.isValid = true;
+ if (readType.getKind() == STRING) {
+ ret.needConvert = false;
+ } else if (readType.getKind() == CHAR || readType.getKind() == VARCHAR) {
+ ret.needConvert = readType.getMaximumLength() < fileType.getMaximumLength();
+ } else {
+ ret.needConvert = true;
+ }
+ break;
+ }
+ case TIMESTAMP:
+ case TIMESTAMP_INSTANT: {
+ if (readType.getKind() == TIMESTAMP || readType.getKind() == TIMESTAMP_INSTANT) {
+ ret = {true, false};
+ } else {
+ ret.isValid = ret.needConvert = (readType.getKind() != BINARY);
+ }
+ break;
+ }
+ case DATE: {
+ ret.isValid = ret.needConvert =
+ readType.getKind() == STRING || readType.getKind() == CHAR ||
+ readType.getKind() == VARCHAR || readType.getKind() == TIMESTAMP ||
+ readType.getKind() == TIMESTAMP_INSTANT;
+ break;
+ }
+ case BINARY: {
+ ret.isValid = ret.needConvert = readType.getKind() == STRING ||
+ readType.getKind() == CHAR ||
+ readType.getKind() == VARCHAR;
+ break;
+ }
+ case STRUCT:
+ case LIST:
+ case MAP:
+ case UNION: {
+ ret.isValid = ret.needConvert = false;
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ return ret;
+ }
+
+ void SchemaEvolution::buildConversion(const Type* _readType, const Type* fileType) {
+ if (fileType == nullptr) {
+ throw SchemaEvolutionError("File does not have " + _readType->toString());
+ }
+
+ auto [valid, convert] = checkConversion(*_readType, *fileType);
+ if (!valid) {
+ invalidConversion(_readType, fileType);
+ }
+ readTypeMap.emplace(_readType->getColumnId(), convert ? _readType : fileType);
+
+ // check whether PPD conversion is safe
+ buildSafePPDConversionMap(_readType, fileType);
+
+ for (uint64_t i = 0; i < _readType->getSubtypeCount(); ++i) {
+ auto subType = _readType->getSubtype(i);
+ if (subType) {
+ // null subType means that this is a sub column of map/list type
+ // and it does not exist in the file. simply skip it.
+ buildConversion(subType, fileType->getTypeByColumnId(subType->getColumnId()));
+ }
+ }
+ }
+
+ bool SchemaEvolution::needConvert(const Type& fileType) const {
+ auto _readType = getReadType(fileType);
+ if (_readType == &fileType) {
+ return false;
+ }
+ // it does not check valid here as verified by buildConversion()
+ return checkConversion(*_readType, fileType).needConvert;
+ }
+
+ inline bool isPrimitive(const Type* type) {
+ auto kind = type->getKind();
+ return kind != STRUCT && kind != MAP && kind != LIST && kind != UNION;
+ }
+
+ void SchemaEvolution::buildSafePPDConversionMap(const Type* _readType, const Type* fileType) {
+ if (_readType == nullptr || !isPrimitive(_readType) || fileType == nullptr ||
+ !isPrimitive(fileType)) {
+ return;
+ }
+
+ bool isSafe = false;
+ if (_readType == fileType) {
+ // short cut for same type
+ isSafe = true;
+ } else if (_readType->getKind() == DECIMAL && fileType->getKind() == DECIMAL) {
+ // for decimals alone do equality check to not mess up with precision change
+ if (fileType->getPrecision() == readType->getPrecision() &&
+ fileType->getScale() == readType->getScale()) {
+ isSafe = true;
+ }
+ } else {
+ // only integer and string evolutions are safe
+ // byte -> short -> int -> long
+ // string <-> char <-> varchar
+ // NOTE: Float to double evolution is not safe as floats are stored as
+ // doubles in ORC's internal index, but when doing predicate evaluation
+ // for queries like "select * from orc_float where f = 74.72" the constant
+ // on the filter is converted from string -> double so the precisions will
+ // be different and the comparison will fail.
+ // Soon, we should convert all sargs that compare equality between floats
+ // or doubles to range predicates.
+ // Similarly string -> char and varchar -> char and vice versa is impossible
+ // as ORC stores char with padded spaces in its internal index.
+ switch (fileType->getKind()) {
+ case BYTE: {
+ if (readType->getKind() == SHORT || readType->getKind() == INT ||
+ readType->getKind() == LONG) {
+ isSafe = true;
+ }
+ break;
+ }
+ case SHORT: {
+ if (readType->getKind() == INT || readType->getKind() == LONG) {
+ isSafe = true;
+ }
+ break;
+ }
+ case INT: {
+ if (readType->getKind() == LONG) {
+ isSafe = true;
+ }
+ break;
+ }
+ case STRING: {
+ if (readType->getKind() == VARCHAR) {
+ isSafe = true;
+ }
+ break;
+ }
+ case VARCHAR: {
+ if (readType->getKind() == STRING) {
+ isSafe = true;
+ }
+ break;
+ }
+ case BOOLEAN:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ case TIMESTAMP:
+ case LIST:
+ case MAP:
+ case STRUCT:
+ case UNION:
+ case DECIMAL:
+ case DATE:
+ case CHAR:
+ case TIMESTAMP_INSTANT:
+ break;
+ }
+ }
+
+ if (isSafe) {
+ safePPDConversionMap.insert(fileType->getColumnId());
+ }
+ }
+
+ bool SchemaEvolution::isSafePPDConversion(uint64_t columnId) const {
+ return safePPDConversionMap.find(columnId) != safePPDConversionMap.cend();
+ }
+
+} // namespace orc
diff --git a/c++/src/SchemaEvolution.hh b/c++/src/SchemaEvolution.hh
new file mode 100644
index 000000000..57ad55b13
--- /dev/null
+++ b/c++/src/SchemaEvolution.hh
@@ -0,0 +1,46 @@
+#ifndef ORC_SCHEMA_EVOLUTION_HH
+#define ORC_SCHEMA_EVOLUTION_HH
+
+#include "orc/Type.hh"
+
+#include <unordered_map>
+#include <unordered_set>
+
+namespace orc {
+
+ /**
+ * Utility class to compare read type and file type to match their columns
+ * and check type conversion.
+ */
+ class SchemaEvolution {
+ public:
+ SchemaEvolution(const std::shared_ptr<Type>& readType, const Type* fileType);
+
+ // get read type by column id from file type. or return the file type if
+ // read type is not provided (i.e. no schema evolution requested).
+ const Type* getReadType(const Type& fileType) const;
+
+ // check if we need to convert file type to read type for primitive type.
+ bool needConvert(const Type& fileType) const;
+
+ // check if the PPD conversion is safe
+ bool isSafePPDConversion(uint64_t columnId) const;
+
+ // return selected read type
+ const Type* getReadType() const {
+ return readType.get();
+ }
+
+ private:
+ void buildConversion(const Type* readType, const Type* fileType);
+ void buildSafePPDConversionMap(const Type* readType, const Type* fileType);
+
+ private:
+ const std::shared_ptr<Type> readType;
+ std::unordered_map<uint64_t, const Type*> readTypeMap;
+ std::unordered_set<uint64_t> safePPDConversionMap;
+ };
+
+} // namespace orc
+
+#endif // ORC_SCHEMA_EVOLUTION_HH
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index c80c5fa6e..4e0f7f268 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -818,4 +818,18 @@ namespace orc {
return std::make_pair(parseCategory(category, input, pos, nextPos), endPos);
}
+ const Type* TypeImpl::getTypeByColumnId(uint64_t colIdx) const {
+ if (getColumnId() == colIdx) {
+ return this;
+ }
+
+ for (uint64_t i = 0; i != getSubtypeCount(); ++i) {
+ const Type* ret = getSubtype(i)->getTypeByColumnId(colIdx);
+ if (ret != nullptr) {
+ return ret;
+ }
+ }
+ return nullptr;
+ }
+
} // namespace orc
diff --git a/c++/src/TypeImpl.hh b/c++/src/TypeImpl.hh
index b76f13642..6d0743793 100644
--- a/c++/src/TypeImpl.hh
+++ b/c++/src/TypeImpl.hh
@@ -88,6 +88,7 @@ namespace orc {
std::string toString() const override;
+ const Type* getTypeByColumnId(uint64_t colIdx) const override;
Type* addStructField(const std::string& fieldName, std::unique_ptr<Type> fieldType) override;
Type* addUnionChild(std::unique_ptr<Type> fieldType) override;
diff --git a/c++/src/sargs/SargsApplier.cc b/c++/src/sargs/SargsApplier.cc
index 1d054c415..2cc3a7cf4 100644
--- a/c++/src/sargs/SargsApplier.cc
+++ b/c++/src/sargs/SargsApplier.cc
@@ -39,9 +39,10 @@ namespace orc {
SargsApplier::SargsApplier(const Type& type, const SearchArgument* searchArgument,
uint64_t rowIndexStride, WriterVersion writerVersion,
- ReaderMetrics* metrics)
+ ReaderMetrics* metrics, const SchemaEvolution* schemaEvolution)
: mType(type),
mSearchArgument(searchArgument),
+ mSchemaEvolution(schemaEvolution),
mRowIndexStride(rowIndexStride),
mWriterVersion(writerVersion),
mHasEvaluatedFileStats(false),
@@ -88,6 +89,9 @@ namespace orc {
if (columnIdx == INVALID_COLUMN_ID || rowIndexIter == rowIndexes.cend()) {
// this column does not exist in current file
leafValues[pred] = TruthValue::YES_NO_NULL;
+ } else if (mSchemaEvolution && !mSchemaEvolution->isSafePPDConversion(columnIdx)) {
+ // cannot evaluate predicate when ppd is not safe
+ leafValues[pred] = TruthValue::YES_NO_NULL;
} else {
// get column statistics
const proto::ColumnStatistics& statistics =
diff --git a/c++/src/sargs/SargsApplier.hh b/c++/src/sargs/SargsApplier.hh
index e527093e4..73703dcf6 100644
--- a/c++/src/sargs/SargsApplier.hh
+++ b/c++/src/sargs/SargsApplier.hh
@@ -27,6 +27,8 @@
#include "sargs/SearchArgument.hh"
+#include "SchemaEvolution.hh"
+
#include <unordered_map>
namespace orc {
@@ -34,7 +36,8 @@ namespace orc {
class SargsApplier {
public:
SargsApplier(const Type& type, const SearchArgument* searchArgument, uint64_t rowIndexStride,
- WriterVersion writerVersion, ReaderMetrics* metrics);
+ WriterVersion writerVersion, ReaderMetrics* metrics,
+ const SchemaEvolution* schemaEvolution = nullptr);
/**
* Evaluate search argument on file statistics
@@ -124,6 +127,7 @@ namespace orc {
private:
const Type& mType;
const SearchArgument* mSearchArgument;
+ const SchemaEvolution* mSchemaEvolution;
uint64_t mRowIndexStride;
WriterVersion mWriterVersion;
// column ids for each predicate leaf in the search argument
diff --git a/c++/test/TestSargsApplier.cc b/c++/test/TestSargsApplier.cc
index dcd19de0d..ebf8f0a64 100644
--- a/c++/test/TestSargsApplier.cc
+++ b/c++/test/TestSargsApplier.cc
@@ -92,7 +92,8 @@ namespace orc {
// evaluate row group index
ReaderMetrics metrics;
- SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135, &metrics);
+ SchemaEvolution se(nullptr, type.get());
+ SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135, &metrics, &se);
EXPECT_TRUE(applier.pickRowGroups(4000, rowIndexes, {}));
const auto& nextSkippedRows = applier.getNextSkippedRows();
EXPECT_EQ(4, nextSkippedRows.size());