You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kylin.apache.org by "刘钊 (Jira)" <ji...@apache.org> on 2022/06/23 04:10:00 UTC
[jira] [Created] (KYLIN-5200) Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent
刘钊 created KYLIN-5200:
-------------------------
Summary: Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent
Key: KYLIN-5200
URL: https://issues.apache.org/jira/browse/KYLIN-5200
Project: Kylin
Issue Type: Bug
Components: Metadata
Affects Versions: v4.0.1
Reporter: 刘钊
I created a cube on kylin version 4.0.1. One of the measures is defined as raw. When I query after building, I find that there are inconsistencies between parquet schema and spark schema. When building cube, the raw measure written to parquet is processed with spark max, and the datatype of Max is child Datatype, in my cube, child Datatype is decimal (19,4). However, when I query through SQL, raw is uniformly specified as binarytype in tablescanpaln. Therefore, I wonder if the structtype of raw in tablescanpaln also uses child dataType ?
when build ,Raw type is child.dataType
@see org.apache.kylin.engine.spark.job.CuboidAggregator
{code:java}
measure.expression.toUpperCase(Locale.ROOT) match {
case "MAX" =>
max(columns.head).as(id.toString)
case "MIN" =>
min(columns.head).as(id.toString)
case "SUM" =>
sum(columns.head).as(id.toString)
case "COUNT" =>
if (reuseLayout) {
sum(columns.head).as(id.toString)
} else {
count(columns.head).as(id.toString)
}
case "COUNT_DISTINCT" =>
// for test
if (isSparkSql) {
countDistinct(columns.head).as(id.toString)
} else {
val cdAggregate = getCountDistinctAggregate(columns, measure.returnType, reuseLayout)
new Column(cdAggregate.toAggregateExpression()).as(id.toString)
}
case "TOP_N" =>
// Uses new TopN aggregate function
// located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
val schema = StructType(measure.pra.map { col =>
val dateType = col.dataType
if (col == measure) {
StructField(s"MEASURE_${col.columnName}", dateType)
} else {
StructField(s"DIMENSION_${col.columnName}", dateType)
}
})
if (reuseLayout) {
new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr)
.toAggregateExpression()).as(id.toString)
} else {
new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, columns.drop(1).map(_.expr))
.toAggregateExpression()).as(id.toString)
}
case "PERCENTILE_APPROX" =>
val udfName = UdfManager.register(measure.returnType.toKylinDataType, measure.expression, null, !reuseLayout)
if (!reuseLayout) {
callUDF(udfName, columns.head.cast(StringType)).as(id.toString)
} else {
callUDF(udfName, columns.head).as(id.toString)
}
case _ =>
max(columns.head).as(id.toString) // Raw matcher here,but max dataType is child.dataType
}
}.toSeq
{code}
But when query,Raw StructType is BinaryType.
@see org.apache.kylin.query.runtime.plans.TableScanPlan ,org.apache.spark.sql.utils.SparkTypeUtil
{code:java}
def toSparkType(dataTp: DataType, isSum: Boolean = false): org.apache.spark.sql.types.DataType = {
dataTp.getName match {
// org.apache.spark.sql.catalyst.expressions.aggregate.Sum#resultType
case "decimal" =>
if (isSum) {
val i = dataTp.getPrecision + 10
DecimalType(Math.min(DecimalType.MAX_PRECISION, i), dataTp.getScale)
}
else DecimalType(dataTp.getPrecision, dataTp.getScale)
case "date" => DateType
case "time" => DateType
case "timestamp" => TimestampType
case "datetime" => DateType
case "tinyint" => if (isSum) LongType else ByteType
case "smallint" => if (isSum) LongType else ShortType
case "integer" => if (isSum) LongType else IntegerType
case "int4" => if (isSum) LongType else IntegerType
case "bigint" => LongType
case "long8" => LongType
case "float" => if (isSum) DoubleType else FloatType
case "double" => DoubleType
case tp if tp.startsWith("varchar") => StringType
case tp if tp.startsWith("char") => StringType
case "dim_dc" => LongType
case "boolean" => BooleanType
case tp if tp.startsWith("hllc") => BinaryType
case tp if tp.startsWith("bitmap") => BinaryType
case tp if tp.startsWith("extendedcolumn") => BinaryType
case tp if tp.startsWith("percentile") => BinaryType
case tp if tp.startsWith("raw") => BinaryType
case _ => throw new IllegalArgumentException(dataTp.toString)
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)