You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kylin.apache.org by "刘钊 (Jira)" <ji...@apache.org> on 2022/06/23 04:10:00 UTC

[jira] [Created] (KYLIN-5200) Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent

刘钊 created KYLIN-5200:
-------------------------

             Summary: Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent
                 Key: KYLIN-5200
                 URL: https://issues.apache.org/jira/browse/KYLIN-5200
             Project: Kylin
          Issue Type: Bug
          Components: Metadata
    Affects Versions: v4.0.1
            Reporter: 刘钊


I created a cube on kylin version 4.0.1. One of the measures is defined as raw. When I query after building, I find that there are inconsistencies between parquet schema and spark schema. When building cube, the raw measure written to parquet is processed with spark max, and the datatype of Max is child Datatype, in my cube, child Datatype is decimal (19,4). However, when I query through SQL, raw is uniformly specified as binarytype in tablescanpaln. Therefore, I wonder if the structtype of raw in tablescanpaln also uses child dataType ?



when build ,Raw type is child.dataType
@see org.apache.kylin.engine.spark.job.CuboidAggregator

{code:java}
measure.expression.toUpperCase(Locale.ROOT) match {
        case "MAX" =>
          max(columns.head).as(id.toString)
        case "MIN" =>
          min(columns.head).as(id.toString)
        case "SUM" =>
          sum(columns.head).as(id.toString)
        case "COUNT" =>
          if (reuseLayout) {
            sum(columns.head).as(id.toString)
          } else {
            count(columns.head).as(id.toString)
          }
        case "COUNT_DISTINCT" =>
          // for test
          if (isSparkSql) {
            countDistinct(columns.head).as(id.toString)
          } else {
            val cdAggregate = getCountDistinctAggregate(columns, measure.returnType, reuseLayout)
            new Column(cdAggregate.toAggregateExpression()).as(id.toString)
          }
        case "TOP_N" =>
          // Uses new TopN aggregate function
          // located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
          val schema = StructType(measure.pra.map { col =>
            val dateType = col.dataType
            if (col == measure) {
              StructField(s"MEASURE_${col.columnName}", dateType)
            } else {
              StructField(s"DIMENSION_${col.columnName}", dateType)
            }
          })

          if (reuseLayout) {
            new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr)
              .toAggregateExpression()).as(id.toString)
          } else {
            new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, columns.drop(1).map(_.expr))
              .toAggregateExpression()).as(id.toString)
          }
        case "PERCENTILE_APPROX" =>
          val udfName = UdfManager.register(measure.returnType.toKylinDataType, measure.expression, null, !reuseLayout)
          if (!reuseLayout) {
            callUDF(udfName, columns.head.cast(StringType)).as(id.toString)
          } else {
            callUDF(udfName, columns.head).as(id.toString)
          }
        case _ =>
          max(columns.head).as(id.toString) // Raw matcher here,but max dataType is child.dataType
      }
    }.toSeq
{code}


But when query,Raw StructType is BinaryType.
@see org.apache.kylin.query.runtime.plans.TableScanPlan ,org.apache.spark.sql.utils.SparkTypeUtil

{code:java}
def toSparkType(dataTp: DataType, isSum: Boolean = false): org.apache.spark.sql.types.DataType = {
    dataTp.getName match {
      // org.apache.spark.sql.catalyst.expressions.aggregate.Sum#resultType
      case "decimal" =>
        if (isSum) {
          val i = dataTp.getPrecision + 10
          DecimalType(Math.min(DecimalType.MAX_PRECISION, i), dataTp.getScale)
        }
        else DecimalType(dataTp.getPrecision, dataTp.getScale)
      case "date" => DateType
      case "time" => DateType
      case "timestamp" => TimestampType
      case "datetime" => DateType
      case "tinyint" => if (isSum) LongType else ByteType
      case "smallint" => if (isSum) LongType else ShortType
      case "integer" => if (isSum) LongType else IntegerType
      case "int4" => if (isSum) LongType else IntegerType
      case "bigint" => LongType
      case "long8" => LongType
      case "float" => if (isSum) DoubleType else FloatType
      case "double" => DoubleType
      case tp if tp.startsWith("varchar") => StringType
      case tp if tp.startsWith("char") => StringType
      case "dim_dc" => LongType
      case "boolean" => BooleanType
      case tp if tp.startsWith("hllc") => BinaryType
      case tp if tp.startsWith("bitmap") => BinaryType
      case tp if tp.startsWith("extendedcolumn") => BinaryType
      case tp if tp.startsWith("percentile") => BinaryType
      case tp if tp.startsWith("raw") => BinaryType
      case _ => throw new IllegalArgumentException(dataTp.toString)
    }
  }
{code}





--
This message was sent by Atlassian Jira
(v8.20.7#820007)