You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kylin.apache.org by "Liu Zhao (Jira)" <ji...@apache.org> on 2022/12/30 09:01:00 UTC

[jira] (KYLIN-5200) Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent

    [ https://issues.apache.org/jira/browse/KYLIN-5200 ]


    Liu Zhao deleted comment on KYLIN-5200:
    ---------------------------------

was (Author: JIRAUSER288373):
[~xxyu],hi, Apache Kylin have a technical exchange group? Can you get me in? Thanks
wechat: LZ_AHHF

> Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent
> ---------------------------------------------------------------------------
>
>                 Key: KYLIN-5200
>                 URL: https://issues.apache.org/jira/browse/KYLIN-5200
>             Project: Kylin
>          Issue Type: Bug
>          Components: Metadata
>    Affects Versions: v4.0.1
>            Reporter: Liu Zhao
>            Assignee: Liu Zhao
>            Priority: Major
>
> I created a cube on kylin version 4.0.1. One of the measures is defined as raw. When I query after building, I find that there are inconsistencies between parquet schema and spark schema. When building cube, the raw measure written to parquet is processed with spark max, and the datatype of Max is child Datatype, in my cube, child Datatype is decimal (19,4). However, when I query through SQL, raw is uniformly specified as binarytype in tablescanpaln. Therefore, I wonder if the structtype of raw in tablescanpaln also uses child dataType ?
> when build ,Raw type is child.dataType
> @see org.apache.kylin.engine.spark.job.CuboidAggregator
> {code:java}
> measure.expression.toUpperCase(Locale.ROOT) match {
>         case "MAX" =>
>           max(columns.head).as(id.toString)
>         case "MIN" =>
>           min(columns.head).as(id.toString)
>         case "SUM" =>
>           sum(columns.head).as(id.toString)
>         case "COUNT" =>
>           if (reuseLayout) {
>             sum(columns.head).as(id.toString)
>           } else {
>             count(columns.head).as(id.toString)
>           }
>         case "COUNT_DISTINCT" =>
>           // for test
>           if (isSparkSql) {
>             countDistinct(columns.head).as(id.toString)
>           } else {
>             val cdAggregate = getCountDistinctAggregate(columns, measure.returnType, reuseLayout)
>             new Column(cdAggregate.toAggregateExpression()).as(id.toString)
>           }
>         case "TOP_N" =>
>           // Uses new TopN aggregate function
>           // located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
>           val schema = StructType(measure.pra.map { col =>
>             val dateType = col.dataType
>             if (col == measure) {
>               StructField(s"MEASURE_${col.columnName}", dateType)
>             } else {
>               StructField(s"DIMENSION_${col.columnName}", dateType)
>             }
>           })
>           if (reuseLayout) {
>             new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr)
>               .toAggregateExpression()).as(id.toString)
>           } else {
>             new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, columns.drop(1).map(_.expr))
>               .toAggregateExpression()).as(id.toString)
>           }
>         case "PERCENTILE_APPROX" =>
>           val udfName = UdfManager.register(measure.returnType.toKylinDataType, measure.expression, null, !reuseLayout)
>           if (!reuseLayout) {
>             callUDF(udfName, columns.head.cast(StringType)).as(id.toString)
>           } else {
>             callUDF(udfName, columns.head).as(id.toString)
>           }
>         case _ =>
>           max(columns.head).as(id.toString) // Raw matcher here,but max dataType is child.dataType
>       }
>     }.toSeq
> {code}
> But when query,Raw StructType is BinaryType.
> @see org.apache.kylin.query.runtime.plans.TableScanPlan ,org.apache.spark.sql.utils.SparkTypeUtil
> {code:java}
> def toSparkType(dataTp: DataType, isSum: Boolean = false): org.apache.spark.sql.types.DataType = {
>     dataTp.getName match {
>       // org.apache.spark.sql.catalyst.expressions.aggregate.Sum#resultType
>       case "decimal" =>
>         if (isSum) {
>           val i = dataTp.getPrecision + 10
>           DecimalType(Math.min(DecimalType.MAX_PRECISION, i), dataTp.getScale)
>         }
>         else DecimalType(dataTp.getPrecision, dataTp.getScale)
>       case "date" => DateType
>       case "time" => DateType
>       case "timestamp" => TimestampType
>       case "datetime" => DateType
>       case "tinyint" => if (isSum) LongType else ByteType
>       case "smallint" => if (isSum) LongType else ShortType
>       case "integer" => if (isSum) LongType else IntegerType
>       case "int4" => if (isSum) LongType else IntegerType
>       case "bigint" => LongType
>       case "long8" => LongType
>       case "float" => if (isSum) DoubleType else FloatType
>       case "double" => DoubleType
>       case tp if tp.startsWith("varchar") => StringType
>       case tp if tp.startsWith("char") => StringType
>       case "dim_dc" => LongType
>       case "boolean" => BooleanType
>       case tp if tp.startsWith("hllc") => BinaryType
>       case tp if tp.startsWith("bitmap") => BinaryType
>       case tp if tp.startsWith("extendedcolumn") => BinaryType
>       case tp if tp.startsWith("percentile") => BinaryType
>       case tp if tp.startsWith("raw") => BinaryType
>       case _ => throw new IllegalArgumentException(dataTp.toString)
>     }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)