You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by chenghao-intel <gi...@git.apache.org> on 2014/08/08 07:43:11 UTC

[GitHub] spark pull request: [WIP][SPARK-2406][SQL] Initial support for usi...

Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1819#discussion_r15978899
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
    @@ -32,6 +38,113 @@ private[hive] trait HiveStrategies {
     
       val hiveContext: HiveContext
     
    +  /**
    +   * :: Experimental ::
    +   * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
    +   * table scan operator.
    +   *
    +   * TODO: Much of this logic is duplicated in HiveTableScan.  Ideally we would do some refactoring
    +   * but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
    +   */
    +  @Experimental
    +  object ParquetConversion extends Strategy {
    +    implicit class LogicalPlanHacks(s: SchemaRDD) {
    +      def lowerCase =
    +        new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
    +    }
    +
    +    implicit class PhysicalPlanHacks(s: SparkPlan) {
    +      def fakeOutput(newOutput: Seq[Attribute]) = OutputFaker(newOutput, s)
    +    }
    +
    +    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    +      case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
    +          if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
    +             hiveContext.convertMetastoreParquet =>
    +
    +        // Filter out all predicates that only deal with partition keys
    +        val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
    +        val (pruningPredicates, otherPredicates) = predicates.partition {
    +          _.references.map(_.exprId).subsetOf(partitionKeyIds)
    +        }
    +
    +        // We are going to throw the predicates and projection back at the whole optimization
    +        // sequence so lets unresolve all the attributes, allowing them to be rebound to the
    +        // matching parquet attributes.
    +        val unresolvedOtherPredicates = otherPredicates.map(_ transform {
    +          case a: AttributeReference => UnresolvedAttribute(a.name)
    +        }).reduceOption(And).getOrElse(Literal(true))
    +
    +        val unresolvedProjection = projectList.map(_ transform {
    +          // Handle non-partitioning columns
    +          case a: AttributeReference if !partitionKeyIds.contains(a.exprId) => UnresolvedAttribute(a.name)
    +        })
    +
    +        if (relation.hiveQlTable.isPartitioned) {
    +          val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +          // Translate the predicate so that it automatically casts the input values to the correct
    +          // data types during evaluation
    +          val castedPredicate = rawPredicate transform {
    +            case a: AttributeReference =>
    +              val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
    +              val key = relation.partitionKeys(idx)
    +              Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
    +          }
    +
    +          val inputData = new GenericMutableRow(relation.partitionKeys.size)
    +          val pruningCondition =
    +            if(codegenEnabled) {
    +              GeneratePredicate(castedPredicate)
    +            } else {
    +              InterpretedPredicate(castedPredicate)
    +            }
    +
    +          val partitions = relation.hiveQlPartitions.filter { part =>
    +            val partitionValues = part.getValues
    +            var i = 0
    +            while (i < partitionValues.size()) {
    +              inputData(i) = partitionValues(i)
    +              i += 1
    +            }
    +            pruningCondition(inputData)
    +          }
    +
    +          org.apache.spark.sql.execution.Union(
    +            partitions.par.map { p =>
    +              val partValues = p.getValues()
    +              val internalProjection = unresolvedProjection.map(_ transform {
    +                // Handle partitioning columns
    +                case a: AttributeReference if partitionKeyIds.contains(a.exprId) => {
    +                  val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
    +                  val key = relation.partitionKeys(idx)
    +
    +                  Alias(Cast(Literal(partValues.get(idx), StringType), key.dataType), a.name)()
    +                }
    +              })
    +
    +              hiveContext
    --- End diff --
    
    Will that causes performance issue if there are lots of partitions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org