You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by guowei2 <gi...@git.apache.org> on 2014/10/27 07:05:37 UTC

[GitHub] spark pull request: Spark 1442 step 1

GitHub user guowei2 opened a pull request:

    https://github.com/apache/spark/pull/2953

    Spark 1442 step 1

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/guowei2/spark SPARK-1442-STEP-1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2953.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2953
    
----
commit 19af53f5b2b55a1c38ed8879c562abc346c4da25
Author: guowei2 <gu...@asiainfo.com>
Date:   2014-10-24T09:55:47Z

    window function

commit 060d42656536d20771525fbe93c28929c440542c
Author: guowei2 <gu...@asiainfo.com>
Date:   2014-10-27T05:29:35Z

    window function

commit cfa0e2a105f3fc6e4b61433a1ba8c246399978b8
Author: guowei2 <gu...@asiainfo.com>
Date:   2014-10-27T06:03:17Z

    window function

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2953#discussion_r19527196
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
    @@ -845,6 +858,198 @@ private[hive] object HiveQl {
           throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
       }
     
    +  // store the window def of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    +  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]()
    +
    +  // store the window spec of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    +  protected val windowPartitionsMap = new ConcurrentHashMap[Long, ArrayBuffer[Node]]()
    +
    +  protected def initWindow() = {
    +    windowDefMap.put(Thread.currentThread().getId, Map[String, Seq[ASTNode]]())
    +    windowPartitionsMap.put(Thread.currentThread().getId, new ArrayBuffer[Node]())
    +  }
    +  protected def checkWindowDef(windowClause: Option[Node]) = {
    +
    +    var winDefs = windowDefMap.get(Thread.currentThread().getId)
    +
    +    windowClause match {
    +      case Some(window) => window.getChildren.foreach {
    +        case Token("TOK_WINDOWDEF", Token(alias, Nil) :: Token("TOK_WINDOWSPEC", ws) :: Nil) => {
    +          winDefs += alias -> ws
    +        }
    +      }
    +      case None => //do nothing
    +    }
    +
    +    windowDefMap.put(Thread.currentThread().getId, winDefs)
    +  }
    +
    +  protected def translateWindowSpec(windowSpec: Seq[ASTNode]): Seq[ASTNode]= {
    +
    +    windowSpec match {
    +      case Token(alias, Nil) :: Nil => translateWindowSpec(getWindowSpec(alias))
    +      case Token(alias, Nil) :: range => {
    +        val (partitionClause :: rowsRange :: valueRange :: Nil) = getClauses(
    +          Seq(
    +            "TOK_PARTITIONINGSPEC",
    +            "TOK_WINDOWRANGE",
    +            "TOK_WINDOWVALUES"),
    +          translateWindowSpec(getWindowSpec(alias)))
    +        partitionClause match {
    +          case Some(partition) => partition.asInstanceOf[ASTNode] :: range
    +          case None => range
    +        }
    +      }
    +      case e => e
    +    }
    +  }
    +
    +  protected def getWindowSpec(alias: String): Seq[ASTNode]= {
    +    windowDefMap.get(Thread.currentThread().getId).getOrElse(
    +      alias, sys.error("no window def for " + alias))
    +  }
    +
    +  protected def addWindowPartitions(partition: Node) = {
    +
    +    var winPartitions = windowPartitionsMap.get(Thread.currentThread().getId)
    +    winPartitions += partition
    +    windowPartitionsMap.put(Thread.currentThread().getId, winPartitions)
    +  }
    +
    +  protected def getWindowPartitions(): Seq[Node]= {
    +    windowPartitionsMap.get(Thread.currentThread().getId).toSeq
    +  }
    +
    +  protected def checkWindowPartitions(): Option[Seq[ASTNode]] = {
    +
    +    val partitionUnits = new ArrayBuffer[Seq[ASTNode]]()
    +
    +    getWindowPartitions.map {
    +      case Token("TOK_PARTITIONINGSPEC", partition)  => Some(partition)
    +      case _ => None
    +    }.foreach {
    +      case Some(partition) => {
    +        if (partitionUnits.isEmpty) partitionUnits += partition
    +        else {
    +          //only add different window partitions
    +          try {
    +            partition zip partitionUnits.head foreach {
    +              case (l,r) => l checkEquals r
    +            }
    +          } catch {
    +            case re: RuntimeException => partitionUnits += partition
    +          }
    +        }
    +      }
    +      case None => //do nothing
    +    }
    +
    +    //check whether all window partitions are same, we just support same window partition now
    --- End diff --
    
    Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2953#discussion_r19527153
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
    @@ -845,6 +858,198 @@ private[hive] object HiveQl {
           throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
       }
     
    +  // store the window def of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    --- End diff --
    
    Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2953#discussion_r19527181
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
    @@ -845,6 +858,198 @@ private[hive] object HiveQl {
           throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
       }
     
    +  // store the window def of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    +  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]()
    +
    +  // store the window spec of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    +  protected val windowPartitionsMap = new ConcurrentHashMap[Long, ArrayBuffer[Node]]()
    +
    +  protected def initWindow() = {
    +    windowDefMap.put(Thread.currentThread().getId, Map[String, Seq[ASTNode]]())
    +    windowPartitionsMap.put(Thread.currentThread().getId, new ArrayBuffer[Node]())
    +  }
    +  protected def checkWindowDef(windowClause: Option[Node]) = {
    +
    +    var winDefs = windowDefMap.get(Thread.currentThread().getId)
    +
    +    windowClause match {
    +      case Some(window) => window.getChildren.foreach {
    +        case Token("TOK_WINDOWDEF", Token(alias, Nil) :: Token("TOK_WINDOWSPEC", ws) :: Nil) => {
    +          winDefs += alias -> ws
    +        }
    +      }
    +      case None => //do nothing
    +    }
    +
    +    windowDefMap.put(Thread.currentThread().getId, winDefs)
    +  }
    +
    +  protected def translateWindowSpec(windowSpec: Seq[ASTNode]): Seq[ASTNode]= {
    +
    +    windowSpec match {
    +      case Token(alias, Nil) :: Nil => translateWindowSpec(getWindowSpec(alias))
    +      case Token(alias, Nil) :: range => {
    +        val (partitionClause :: rowsRange :: valueRange :: Nil) = getClauses(
    +          Seq(
    +            "TOK_PARTITIONINGSPEC",
    +            "TOK_WINDOWRANGE",
    +            "TOK_WINDOWVALUES"),
    +          translateWindowSpec(getWindowSpec(alias)))
    +        partitionClause match {
    +          case Some(partition) => partition.asInstanceOf[ASTNode] :: range
    +          case None => range
    +        }
    +      }
    +      case e => e
    +    }
    +  }
    +
    +  protected def getWindowSpec(alias: String): Seq[ASTNode]= {
    +    windowDefMap.get(Thread.currentThread().getId).getOrElse(
    +      alias, sys.error("no window def for " + alias))
    +  }
    +
    +  protected def addWindowPartitions(partition: Node) = {
    +
    +    var winPartitions = windowPartitionsMap.get(Thread.currentThread().getId)
    +    winPartitions += partition
    +    windowPartitionsMap.put(Thread.currentThread().getId, winPartitions)
    +  }
    +
    +  protected def getWindowPartitions(): Seq[Node]= {
    +    windowPartitionsMap.get(Thread.currentThread().getId).toSeq
    +  }
    +
    +  protected def checkWindowPartitions(): Option[Seq[ASTNode]] = {
    +
    +    val partitionUnits = new ArrayBuffer[Seq[ASTNode]]()
    +
    +    getWindowPartitions.map {
    +      case Token("TOK_PARTITIONINGSPEC", partition)  => Some(partition)
    +      case _ => None
    +    }.foreach {
    +      case Some(partition) => {
    +        if (partitionUnits.isEmpty) partitionUnits += partition
    +        else {
    +          //only add different window partitions
    --- End diff --
    
    Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 commented on the pull request:

    https://github.com/apache/spark/pull/2953#issuecomment-60553573
  
    aims of step1 ( this pr):
    1、support parse sql with complex window define 
    2、support most of aggregate-functions with window-spec
    3、support window range 
    
    aims of step2:
    support the rest of un-support features as below:
     
     1. not support with multi-different window partitions ,but support with multi-same window partitions
     2. not support with both window partition and group by
     3. not support lead, lag (default lead and lag function lookup in HiveFunctionRegistry are GenericUDF, we need GenericUDAF)
     4. not support rank, dense_rank 
     3. not support sql parse with TOK_PTBLFUNCTION


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2953#discussion_r19527873
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionSuite.scala ---
    @@ -0,0 +1,314 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive.execution
    +
    +import org.apache.spark.sql.hive._
    +import org.apache.spark.sql.hive.test.TestHive
    +import org.apache.spark.sql.hive.test.TestHive._
    +import org.apache.spark.sql.{Row, SchemaRDD}
    +
    +class HiveWindowFunctionSuite extends HiveComparisonTest {
    +
    +  override def beforeAll() {
    +    sql("DROP TABLE IF EXISTS part").collect()
    +
    +    sql("""
    +        |CREATE TABLE part(
    +        |    p_partkey INT,
    +        |    p_name STRING,
    +        |    p_mfgr STRING,
    +        |    p_brand STRING,
    +        |    p_type STRING,
    +        |    p_size INT,
    +        |    p_container STRING,
    +        |    p_retailprice DOUBLE,
    +        |    p_comment STRING
    +        |)
    +      """.stripMargin).collect()
    +
    +    //remove duplicate data in part_tiny.txt for hive bug
    --- End diff --
    
    Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2953#discussion_r19527805
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WindowFunction.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import java.util.HashMap
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.physical.AllTuples
    +import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
    +import org.apache.spark.sql.catalyst.errors._
    +import scala.collection.mutable.ArrayBuffer
    +import org.apache.spark.util.collection.CompactBuffer
    +import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
    +import org.apache.spark.sql.catalyst.expressions.AttributeReference
    +import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection
    +import org.apache.spark.sql.catalyst.expressions.Alias
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.dsl.plans._
    +import org.apache.spark.sql.catalyst.dsl.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical.SortPartitions
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Groups input data by `partitionExpressions` and computes the `computeExpressions` for each
    + * group.
    + * @param partitionExpressions expressions that are evaluated to determine partition.
    + * @param functionExpressions expressions that are computed for each partition.
    + * @param child the input data source.
    + */
    +@DeveloperApi
    +case class WindowFunction(
    +  partitionExpressions: Seq[Expression],
    +  functionExpressions: Seq[NamedExpression],
    +  child: SparkPlan)
    +  extends UnaryNode {
    +
    +  override def requiredChildDistribution =
    +    if (partitionExpressions == Nil) {
    +      AllTuples :: Nil
    +    } else {
    +      ClusteredDistribution(partitionExpressions) :: Nil
    +    }
    +
    +  // HACK: Generators don't correctly preserve their output through serializations so we grab
    +  // out child's output attributes statically here.
    +  private[this] val childOutput = child.output
    +
    +  override def output = functionExpressions.map(_.toAttribute)
    +
    +  /** A list of functions that need to be computed for each partition. */
    +  private[this] val computeExpressions = new ArrayBuffer[AggregateExpression]
    +
    +  private[this] val otherExpressions = new ArrayBuffer[NamedExpression]
    +
    +  functionExpressions.foreach { sel =>
    +    sel.collect {
    +      case func: AggregateExpression => computeExpressions += func
    +      case other: NamedExpression if (!other.isInstanceOf[Alias]) => otherExpressions += other
    +    }
    +  }
    +
    +  private[this] val functionAttributes = computeExpressions.map { func =>
    +    func -> AttributeReference(s"funcResult:$func", func.dataType, func.nullable)()}
    +
    +  /** The schema of the result of all evaluations */
    +  private[this] val resultAttributes =
    +    otherExpressions.map(_.toAttribute) ++ functionAttributes.map(_._2)
    +
    +  private[this] val resultMap =
    +    (otherExpressions.map { other => other -> other.toAttribute } ++ functionAttributes
    +    ).toMap
    +
    +
    +  private[this] val resultExpressions = functionExpressions.map { sel =>
    +    sel.transform {
    +      case e: Expression if resultMap.contains(e) => resultMap(e)
    +    }
    +  }
    +
    +  private[this] val sortExpressions =
    +    if (child.isInstanceOf[SortPartitions]) {
    +      child.asInstanceOf[SortPartitions].sortExpressions
    +    }
    +    else if (child.isInstanceOf[Sort]) {
    +      child.asInstanceOf[Sort].sortOrder
    +    }
    +    else null
    +
    +  /** Creates a new function buffer for a partition. */
    +  private[this] def newFunctionBuffer(): Array[AggregateFunction] = {
    +    val buffer = new Array[AggregateFunction](computeExpressions.length)
    +    var i = 0
    +    while (i < computeExpressions.length) {
    +      val baseExpr = BindReferences.bindReference(computeExpressions(i), childOutput)
    +      baseExpr.windowRange = computeExpressions(i).windowRange
    +      buffer(i) = baseExpr.newInstance()
    +      i += 1
    +    }
    +    buffer
    +  }
    +
    +  private[this] def computeFunctions(rows: CompactBuffer[Row]): Array[Iterator[Any]] = {
    +    val aggrFunctions = newFunctionBuffer()
    +    val functionResults = new Array[Iterator[Any]](aggrFunctions.length)
    +    var i = 0
    +    while (i < aggrFunctions.length) {
    +      val aggrFunction = aggrFunctions(i)
    +      val base = aggrFunction.base
    +      if (base.windowRange == null) {
    +        if (sortExpressions != null) {
    +          if (aggrFunction.dataType.isInstanceOf[ArrayType]) {
    +            rows.foreach(aggrFunction.update)
    +            functionResults(i) = aggrFunction.eval(EmptyRow).asInstanceOf[Seq[Any]].iterator
    +          } else {
    +            functionResults(i) = rows.map(row => {
    +              aggrFunction.update(row)
    +              aggrFunction.eval(EmptyRow)
    +            }).iterator
    +          }
    +        } else {
    +          rows.foreach(aggrFunction.update)
    +          functionResults(i) = aggrFunction.eval(EmptyRow) match {
    +            case r: Seq[_] => r.iterator
    +            case other => (0 to rows.size - 1).map(r => other).iterator
    +          }
    +        }
    +
    +      } else {
    +        functionResults(i) =
    +          if (base.windowRange.windowType == "ROWS_RANGE") rowsWindowFunction(base, rows).iterator
    +          else valueWindowFunction(base, rows).iterator
    +      }
    +      i += 1
    +    }
    +    functionResults
    +  }
    +
    +  private[this] def rowsWindowFunction(base: AggregateExpression,
    +    rows: CompactBuffer[Row]): CompactBuffer[Any] = {
    +
    +    val rangeResults = new CompactBuffer[Any]()
    +    var rowIndex = 0
    +    while (rowIndex < rows.size) {
    +
    +      val windowRange = base.windowRange
    +      var start =
    +        if (windowRange.preceding == Int.MaxValue) 0
    +        else rowIndex - windowRange.preceding
    +      if (start < 0) start = 0
    +      var end =
    +        if (windowRange.following == Int.MaxValue) {
    +          rows.size - 1
    +        } else {
    +          rowIndex + windowRange.following
    +        }
    +      if (end > rows.size - 1) end = rows.size - 1
    +
    +      //new aggregate function
    --- End diff --
    
    Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2953#issuecomment-60713308
  
    Can you write a short high level design doc for this change and attach it to JIRA?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2953#discussion_r19527141
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
    @@ -845,6 +858,198 @@ private[hive] object HiveQl {
           throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
       }
     
    +  // store the window def of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    +  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]()
    +
    +  // store the window spec of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    --- End diff --
    
    Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2953#discussion_r19527811
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WindowFunction.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import java.util.HashMap
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.physical.AllTuples
    +import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
    +import org.apache.spark.sql.catalyst.errors._
    +import scala.collection.mutable.ArrayBuffer
    +import org.apache.spark.util.collection.CompactBuffer
    +import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
    +import org.apache.spark.sql.catalyst.expressions.AttributeReference
    +import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection
    +import org.apache.spark.sql.catalyst.expressions.Alias
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.dsl.plans._
    +import org.apache.spark.sql.catalyst.dsl.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical.SortPartitions
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Groups input data by `partitionExpressions` and computes the `computeExpressions` for each
    + * group.
    + * @param partitionExpressions expressions that are evaluated to determine partition.
    + * @param functionExpressions expressions that are computed for each partition.
    + * @param child the input data source.
    + */
    +@DeveloperApi
    +case class WindowFunction(
    +  partitionExpressions: Seq[Expression],
    +  functionExpressions: Seq[NamedExpression],
    +  child: SparkPlan)
    +  extends UnaryNode {
    +
    +  override def requiredChildDistribution =
    +    if (partitionExpressions == Nil) {
    +      AllTuples :: Nil
    +    } else {
    +      ClusteredDistribution(partitionExpressions) :: Nil
    +    }
    +
    +  // HACK: Generators don't correctly preserve their output through serializations so we grab
    +  // out child's output attributes statically here.
    +  private[this] val childOutput = child.output
    +
    +  override def output = functionExpressions.map(_.toAttribute)
    +
    +  /** A list of functions that need to be computed for each partition. */
    +  private[this] val computeExpressions = new ArrayBuffer[AggregateExpression]
    +
    +  private[this] val otherExpressions = new ArrayBuffer[NamedExpression]
    +
    +  functionExpressions.foreach { sel =>
    +    sel.collect {
    +      case func: AggregateExpression => computeExpressions += func
    +      case other: NamedExpression if (!other.isInstanceOf[Alias]) => otherExpressions += other
    +    }
    +  }
    +
    +  private[this] val functionAttributes = computeExpressions.map { func =>
    +    func -> AttributeReference(s"funcResult:$func", func.dataType, func.nullable)()}
    +
    +  /** The schema of the result of all evaluations */
    +  private[this] val resultAttributes =
    +    otherExpressions.map(_.toAttribute) ++ functionAttributes.map(_._2)
    +
    +  private[this] val resultMap =
    +    (otherExpressions.map { other => other -> other.toAttribute } ++ functionAttributes
    +    ).toMap
    +
    +
    +  private[this] val resultExpressions = functionExpressions.map { sel =>
    +    sel.transform {
    +      case e: Expression if resultMap.contains(e) => resultMap(e)
    +    }
    +  }
    +
    +  private[this] val sortExpressions =
    +    if (child.isInstanceOf[SortPartitions]) {
    +      child.asInstanceOf[SortPartitions].sortExpressions
    +    }
    +    else if (child.isInstanceOf[Sort]) {
    +      child.asInstanceOf[Sort].sortOrder
    +    }
    +    else null
    +
    +  /** Creates a new function buffer for a partition. */
    +  private[this] def newFunctionBuffer(): Array[AggregateFunction] = {
    +    val buffer = new Array[AggregateFunction](computeExpressions.length)
    +    var i = 0
    +    while (i < computeExpressions.length) {
    +      val baseExpr = BindReferences.bindReference(computeExpressions(i), childOutput)
    +      baseExpr.windowRange = computeExpressions(i).windowRange
    +      buffer(i) = baseExpr.newInstance()
    +      i += 1
    +    }
    +    buffer
    +  }
    +
    +  private[this] def computeFunctions(rows: CompactBuffer[Row]): Array[Iterator[Any]] = {
    +    val aggrFunctions = newFunctionBuffer()
    +    val functionResults = new Array[Iterator[Any]](aggrFunctions.length)
    +    var i = 0
    +    while (i < aggrFunctions.length) {
    +      val aggrFunction = aggrFunctions(i)
    +      val base = aggrFunction.base
    +      if (base.windowRange == null) {
    +        if (sortExpressions != null) {
    +          if (aggrFunction.dataType.isInstanceOf[ArrayType]) {
    +            rows.foreach(aggrFunction.update)
    +            functionResults(i) = aggrFunction.eval(EmptyRow).asInstanceOf[Seq[Any]].iterator
    +          } else {
    +            functionResults(i) = rows.map(row => {
    +              aggrFunction.update(row)
    +              aggrFunction.eval(EmptyRow)
    +            }).iterator
    +          }
    +        } else {
    +          rows.foreach(aggrFunction.update)
    +          functionResults(i) = aggrFunction.eval(EmptyRow) match {
    +            case r: Seq[_] => r.iterator
    +            case other => (0 to rows.size - 1).map(r => other).iterator
    +          }
    +        }
    +
    +      } else {
    +        functionResults(i) =
    +          if (base.windowRange.windowType == "ROWS_RANGE") rowsWindowFunction(base, rows).iterator
    +          else valueWindowFunction(base, rows).iterator
    +      }
    +      i += 1
    +    }
    +    functionResults
    +  }
    +
    +  private[this] def rowsWindowFunction(base: AggregateExpression,
    +    rows: CompactBuffer[Row]): CompactBuffer[Any] = {
    +
    +    val rangeResults = new CompactBuffer[Any]()
    +    var rowIndex = 0
    +    while (rowIndex < rows.size) {
    +
    +      val windowRange = base.windowRange
    +      var start =
    +        if (windowRange.preceding == Int.MaxValue) 0
    +        else rowIndex - windowRange.preceding
    +      if (start < 0) start = 0
    +      var end =
    +        if (windowRange.following == Int.MaxValue) {
    +          rows.size - 1
    +        } else {
    +          rowIndex + windowRange.following
    +        }
    +      if (end > rows.size - 1) end = rows.size - 1
    +
    +      //new aggregate function
    +      val aggr = base.newInstance()
    +      (start to end).foreach(i => aggr.update(rows(i)))
    +
    +      rangeResults += aggr.eval(EmptyRow)
    +      rowIndex += 1
    +    }
    +    rangeResults
    +  }
    +
    +  private[this] def valueWindowFunction(base: AggregateExpression,
    +    rows: CompactBuffer[Row]): CompactBuffer[Any] = {
    +
    +    val windowRange = base.windowRange
    +
    +    // rande only support 1 order
    +    val sortExpression = BindReferences.bindReference(sortExpressions.head, childOutput)
    +
    +    val preceding = sortExpression.child.dataType match {
    +      case IntegerType => Literal(windowRange.preceding)
    +      case LongType => Literal(windowRange.preceding.toLong)
    +      case DoubleType => Literal(windowRange.preceding.toDouble)
    +      case FloatType => Literal(windowRange.preceding.toFloat)
    +      case ShortType => Literal(windowRange.preceding.toShort)
    +      case DecimalType => Literal(BigDecimal(windowRange.preceding))
    +      case _=> throw new Exception(s"not support dataType ")
    +    }
    +    val following = sortExpression.child.dataType match {
    +      case IntegerType => Literal(windowRange.following)
    +      case LongType => Literal(windowRange.following.toLong)
    +      case DoubleType => Literal(windowRange.following.toDouble)
    +      case FloatType => Literal(windowRange.following.toFloat)
    +      case ShortType => Literal(windowRange.following.toShort)
    +      case DecimalType => Literal(BigDecimal(windowRange.following))
    +      case _=> throw new Exception(s"not support dataType ")
    +    }
    +
    +    val rangeResults = new CompactBuffer[Any]()
    +    var rowIndex = 0
    +    while (rowIndex < rows.size) {
    +      val currentRow = rows(rowIndex)
    +      val precedingExpr =
    +        if (sortExpression.direction == Ascending) {
    +          Literal(sortExpression.child.eval(currentRow)) - sortExpression.child <= preceding
    +        } else {
    +          sortExpression.child - Literal(sortExpression.child.eval(currentRow)) <= preceding
    +        }
    +
    +
    +      val followingExpr =
    +        if (sortExpression.direction == Ascending) {
    +          sortExpression.child - Literal(sortExpression.child.eval(currentRow)) <= following
    +        } else {
    +          Literal(sortExpression.child.eval(currentRow)) - sortExpression.child <= following
    +        }
    +
    +      var precedingIndex = 0
    +      var followingIndex = rows.size - 1
    +      if (sortExpression != null) {
    +
    +        if (windowRange.preceding != Int.MaxValue) precedingIndex = rowIndex
    +        while (precedingIndex > 0 &&
    +          precedingExpr.eval(rows(precedingIndex - 1)).asInstanceOf[Boolean]) {
    +          precedingIndex -= 1
    +        }
    +
    +        if (windowRange.following != Int.MaxValue) followingIndex = rowIndex
    +        while (followingIndex < rows.size - 1 &&
    +          followingExpr.eval(rows(followingIndex + 1)).asInstanceOf[Boolean]) {
    +          followingIndex += 1
    +        }
    +      }
    +      //new aggregate function
    --- End diff --
    
    Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 commented on the pull request:

    https://github.com/apache/spark/pull/2953#issuecomment-61205930
  
    @rxin ,I've attached a doc to JIRA, is it OK?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2953#discussion_r19527190
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
    @@ -845,6 +858,198 @@ private[hive] object HiveQl {
           throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
       }
     
    +  // store the window def of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    +  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]()
    +
    +  // store the window spec of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    +  protected val windowPartitionsMap = new ConcurrentHashMap[Long, ArrayBuffer[Node]]()
    +
    +  protected def initWindow() = {
    +    windowDefMap.put(Thread.currentThread().getId, Map[String, Seq[ASTNode]]())
    +    windowPartitionsMap.put(Thread.currentThread().getId, new ArrayBuffer[Node]())
    +  }
    +  protected def checkWindowDef(windowClause: Option[Node]) = {
    +
    +    var winDefs = windowDefMap.get(Thread.currentThread().getId)
    +
    +    windowClause match {
    +      case Some(window) => window.getChildren.foreach {
    +        case Token("TOK_WINDOWDEF", Token(alias, Nil) :: Token("TOK_WINDOWSPEC", ws) :: Nil) => {
    +          winDefs += alias -> ws
    +        }
    +      }
    +      case None => //do nothing
    +    }
    +
    +    windowDefMap.put(Thread.currentThread().getId, winDefs)
    +  }
    +
    +  protected def translateWindowSpec(windowSpec: Seq[ASTNode]): Seq[ASTNode]= {
    +
    +    windowSpec match {
    +      case Token(alias, Nil) :: Nil => translateWindowSpec(getWindowSpec(alias))
    +      case Token(alias, Nil) :: range => {
    +        val (partitionClause :: rowsRange :: valueRange :: Nil) = getClauses(
    +          Seq(
    +            "TOK_PARTITIONINGSPEC",
    +            "TOK_WINDOWRANGE",
    +            "TOK_WINDOWVALUES"),
    +          translateWindowSpec(getWindowSpec(alias)))
    +        partitionClause match {
    +          case Some(partition) => partition.asInstanceOf[ASTNode] :: range
    +          case None => range
    +        }
    +      }
    +      case e => e
    +    }
    +  }
    +
    +  protected def getWindowSpec(alias: String): Seq[ASTNode]= {
    +    windowDefMap.get(Thread.currentThread().getId).getOrElse(
    +      alias, sys.error("no window def for " + alias))
    +  }
    +
    +  protected def addWindowPartitions(partition: Node) = {
    +
    +    var winPartitions = windowPartitionsMap.get(Thread.currentThread().getId)
    +    winPartitions += partition
    +    windowPartitionsMap.put(Thread.currentThread().getId, winPartitions)
    +  }
    +
    +  protected def getWindowPartitions(): Seq[Node]= {
    +    windowPartitionsMap.get(Thread.currentThread().getId).toSeq
    +  }
    +
    +  protected def checkWindowPartitions(): Option[Seq[ASTNode]] = {
    +
    +    val partitionUnits = new ArrayBuffer[Seq[ASTNode]]()
    +
    +    getWindowPartitions.map {
    +      case Token("TOK_PARTITIONINGSPEC", partition)  => Some(partition)
    +      case _ => None
    +    }.foreach {
    +      case Some(partition) => {
    +        if (partitionUnits.isEmpty) partitionUnits += partition
    +        else {
    +          //only add different window partitions
    +          try {
    +            partition zip partitionUnits.head foreach {
    +              case (l,r) => l checkEquals r
    +            }
    +          } catch {
    +            case re: RuntimeException => partitionUnits += partition
    +          }
    +        }
    +      }
    +      case None => //do nothing
    --- End diff --
    
    Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2953#issuecomment-60553550
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2953#discussion_r19527163
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
    @@ -845,6 +858,198 @@ private[hive] object HiveQl {
           throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
       }
     
    +  // store the window def of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    +  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]()
    +
    +  // store the window spec of current sql
    +  //use thread id as key to avoid mistake when muti sqls parse at the same time
    +  protected val windowPartitionsMap = new ConcurrentHashMap[Long, ArrayBuffer[Node]]()
    +
    +  protected def initWindow() = {
    +    windowDefMap.put(Thread.currentThread().getId, Map[String, Seq[ASTNode]]())
    +    windowPartitionsMap.put(Thread.currentThread().getId, new ArrayBuffer[Node]())
    +  }
    +  protected def checkWindowDef(windowClause: Option[Node]) = {
    +
    +    var winDefs = windowDefMap.get(Thread.currentThread().getId)
    +
    +    windowClause match {
    +      case Some(window) => window.getChildren.foreach {
    +        case Token("TOK_WINDOWDEF", Token(alias, Nil) :: Token("TOK_WINDOWSPEC", ws) :: Nil) => {
    +          winDefs += alias -> ws
    +        }
    +      }
    +      case None => //do nothing
    --- End diff --
    
    Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 closed the pull request at:

    https://github.com/apache/spark/pull/2953


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org