You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ueshin <gi...@git.apache.org> on 2018/08/02 03:12:45 UTC

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

GitHub user ueshin opened a pull request:

    https://github.com/apache/spark/pull/21954

    [SPARK-23908][SQL] Add transform function.

    ## What changes were proposed in this pull request?
    
    This pr adds `transform` function which transforms elements in an array using the function.
    Optionally we can take the index of each element as the second argument.
    
    ```sql
    > SELECT transform(array(1, 2, 3), x -> x + 1);
     array(2, 3, 4)
    > SELECT transform(array(1, 2, 3), (x, i) -> x + i);
     array(1, 3, 5)
    ```
    
    ## How was this patch tested?
    
    Added tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ueshin/apache-spark issues/SPARK-23908/transform

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21954.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21954
    
----
commit e860249c96854daefccc580ca52e3fad7d1acf67
Author: Takuya UESHIN <ue...@...>
Date:   2018-08-01T03:46:00Z

    Add `LambdaFunction` and its parser.

commit 06825b6a3ae6b3085c4b4e5c010dffa75e988801
Author: Takuya UESHIN <ue...@...>
Date:   2018-08-01T05:52:40Z

    Add `ResolveHigherOrderFunctions`.

commit 17ab2ffc73a664ba2b00d49f0835faff055274b0
Author: Takuya UESHIN <ue...@...>
Date:   2018-08-01T05:59:32Z

    Add `ArrayTransform`.

commit 95a06b4fe3f660c617a6a53bf0cccc5a0a62306b
Author: Takuya UESHIN <ue...@...>
Date:   2018-08-01T08:06:01Z

    Test in sql/core.

commit 4448d0b7a085c1613dc0dd52009b6d50388ec605
Author: Takuya UESHIN <ue...@...>
Date:   2018-08-01T09:20:14Z

    Add negative cases.

commit abc685f86ee205f2a64b065c98a65fc2d36bfd75
Author: Takuya UESHIN <ue...@...>
Date:   2018-08-01T09:46:00Z

    Add sql file.

commit ee450c5ef3f99d3bbf8dbbd05273bc63005bbccb
Author: Takuya UESHIN <ue...@...>
Date:   2018-08-01T10:15:44Z

    Replace lambda variable in function by one in arguments to make sure the variables are the same as them in arguments.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #93950 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93950/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1671/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208268129
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression
    +  with CodegenFallback {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression with CodegenFallback {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    +        case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
    +          argumentMap(variable.exprId)
    +      }
    +  }
    +}
    +
    +trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
    +
    +  def input: Expression
    +
    +  override def inputs: Seq[Expression] = input :: Nil
    +
    +  def function: Expression
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  def expectingFunctionType: AbstractDataType = AnyDataType
    +
    +  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +}
    +
    +/**
    + * Transform elements in an array using the transform function. This is similar to
    + * a `map` in functional programming.
    + */
    +@ExpressionDescription(
    +  usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
    +       array(2, 3, 4)
    +      > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
    +       array(1, 3, 5)
    +  """,
    +  since = "2.4.0")
    +case class ArrayTransform(
    +    input: Expression,
    +    function: Expression)
    +  extends ArrayBasedHigherOrderFunction with CodegenFallback {
    +
    +  override def nullable: Boolean = input.nullable
    +
    +  override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
    +    val (elementType, containsNull) = input.dataType match {
    +      case ArrayType(elementType, containsNull) => (elementType, containsNull)
    +      case _ =>
    +        val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
    --- End diff --
    
    Then shall we fail the analysis before going into `bind`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207171941
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    --- End diff --
    
    Yeah, that makes sense. Let's leave it for now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208273712
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.analysis
    +
    +import org.apache.spark.sql.catalyst.catalog.SessionCatalog
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.DataType
    +
    +/**
    + * Resolve a higher order functions from the catalog. This is different from regular function
    + * resolution because lambda functions can only be resolved after the function has been resolved;
    + * so we need to resolve higher order function when all children are either resolved or a lambda
    + * function.
    + */
    +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    +    case q: LogicalPlan =>
    +      q.transformExpressions {
    +        case u @ UnresolvedFunction(fn, children, false)
    +            if hasLambdaAndResolvedArguments(children) =>
    +          withPosition(u) {
    +            catalog.lookupFunction(fn, children) match {
    +              case func: HigherOrderFunction => func
    +              case other => other.failAnalysis(
    +                "A lambda function should only be used in a higher order function. However, " +
    +                  s"its class is ${other.getClass.getCanonicalName}, which is not a " +
    +                  s"higher order function.")
    +            }
    +          }
    +      }
    +  }
    +
    +  /**
    +   * Check if the arguments of a function are either resolved or a lambda function.
    +   */
    +  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
    +    val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
    +    lambdas.nonEmpty && others.forall(_.resolved)
    +  }
    +}
    +
    +/**
    + * Resolve the lambda variables exposed by a higher order functions.
    + *
    + * This rule works in two steps:
    + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
    + *      arguments; this creates named and typed lambda variables. The argument names are checked
    + *      for duplicates and the number of arguments are checked during this step.
    + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
    + *      Note that we allow the use of variables from outside the current lambda, this can either
    + *      be a lambda function defined in an outer scope, or a attribute in produced by the plan's
    + *      child. If names are duplicate, the name defined in the most inner scope is used.
    + */
    +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
    +
    +  type LambdaVariableMap = Map[String, NamedExpression]
    +
    +  private val canonicalizer = {
    +    if (!conf.caseSensitiveAnalysis) {
    +      s: String => s.toLowerCase
    +    } else {
    +      s: String => s
    +    }
    +  }
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    plan.resolveOperators {
    +      case q: LogicalPlan =>
    +        q.mapExpressions(resolve(_, Map.empty))
    +    }
    +  }
    +
    +  /**
    +   * Create a bound lambda function by binding the arguments of a lambda function to the given
    +   * partial arguments (dataType and nullability only). If the expression happens to be an already
    +   * bound lambda function then we assume it has been bound to the correct arguments and do
    +   * nothing. This function will produce a lambda function with hidden arguments when it is passed
    +   * an arbitrary expression.
    +   */
    +  private def createLambda(
    +      e: Expression,
    +      partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
    --- End diff --
    
    They are partial because we only pass the dataType and nullable flag.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208282782
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.analysis
    +
    +import org.apache.spark.sql.catalyst.catalog.SessionCatalog
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.DataType
    +
    +/**
    + * Resolve a higher order functions from the catalog. This is different from regular function
    + * resolution because lambda functions can only be resolved after the function has been resolved;
    + * so we need to resolve higher order function when all children are either resolved or a lambda
    + * function.
    + */
    +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    +    case q: LogicalPlan =>
    +      q.transformExpressions {
    +        case u @ UnresolvedFunction(fn, children, false)
    +            if hasLambdaAndResolvedArguments(children) =>
    +          withPosition(u) {
    +            catalog.lookupFunction(fn, children) match {
    +              case func: HigherOrderFunction => func
    +              case other => other.failAnalysis(
    +                "A lambda function should only be used in a higher order function. However, " +
    +                  s"its class is ${other.getClass.getCanonicalName}, which is not a " +
    +                  s"higher order function.")
    +            }
    +          }
    +      }
    +  }
    +
    +  /**
    +   * Check if the arguments of a function are either resolved or a lambda function.
    +   */
    +  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
    +    val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
    +    lambdas.nonEmpty && others.forall(_.resolved)
    +  }
    +}
    +
    +/**
    + * Resolve the lambda variables exposed by a higher order functions.
    + *
    + * This rule works in two steps:
    + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
    + *      arguments; this creates named and typed lambda variables. The argument names are checked
    + *      for duplicates and the number of arguments are checked during this step.
    + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
    + *      Note that we allow the use of variables from outside the current lambda, this can either
    + *      be a lambda function defined in an outer scope, or a attribute in produced by the plan's
    + *      child. If names are duplicate, the name defined in the most inner scope is used.
    + */
    +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
    +
    +  type LambdaVariableMap = Map[String, NamedExpression]
    +
    +  private val canonicalizer = {
    +    if (!conf.caseSensitiveAnalysis) {
    +      s: String => s.toLowerCase
    +    } else {
    +      s: String => s
    +    }
    +  }
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    plan.resolveOperators {
    +      case q: LogicalPlan =>
    +        q.mapExpressions(resolve(_, Map.empty))
    +    }
    +  }
    +
    +  /**
    +   * Create a bound lambda function by binding the arguments of a lambda function to the given
    +   * partial arguments (dataType and nullability only). If the expression happens to be an already
    +   * bound lambda function then we assume it has been bound to the correct arguments and do
    +   * nothing. This function will produce a lambda function with hidden arguments when it is passed
    +   * an arbitrary expression.
    +   */
    +  private def createLambda(
    +      e: Expression,
    +      partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
    --- End diff --
    
    how about `argInfo`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #94002 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94002/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] `
      * `                  s\"its class is $`
      * `case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] `
      * `case class NamedLambdaVariable(`
      * `case class LambdaFunction(`
      * `trait HigherOrderFunction extends Expression `
      * `trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes `
      * `case class ArrayTransform(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207145478
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    --- End diff --
    
    You are only using the `AtomicReference ` as an container right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207158636
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    --- End diff --
    
    You did? Could you elaborate? There shouldn't be any current access here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208282941
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.analysis
    +
    +import org.apache.spark.sql.catalyst.catalog.SessionCatalog
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.DataType
    +
    +/**
    + * Resolve a higher order functions from the catalog. This is different from regular function
    + * resolution because lambda functions can only be resolved after the function has been resolved;
    + * so we need to resolve higher order function when all children are either resolved or a lambda
    + * function.
    + */
    +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    +    case q: LogicalPlan =>
    +      q.transformExpressions {
    +        case u @ UnresolvedFunction(fn, children, false)
    +            if hasLambdaAndResolvedArguments(children) =>
    +          withPosition(u) {
    +            catalog.lookupFunction(fn, children) match {
    +              case func: HigherOrderFunction => func
    +              case other => other.failAnalysis(
    +                "A lambda function should only be used in a higher order function. However, " +
    +                  s"its class is ${other.getClass.getCanonicalName}, which is not a " +
    +                  s"higher order function.")
    +            }
    +          }
    +      }
    +  }
    +
    +  /**
    +   * Check if the arguments of a function are either resolved or a lambda function.
    +   */
    +  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
    +    val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
    +    lambdas.nonEmpty && others.forall(_.resolved)
    +  }
    +}
    +
    +/**
    + * Resolve the lambda variables exposed by a higher order functions.
    + *
    + * This rule works in two steps:
    + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
    + *      arguments; this creates named and typed lambda variables. The argument names are checked
    + *      for duplicates and the number of arguments are checked during this step.
    + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
    + *      Note that we allow the use of variables from outside the current lambda, this can either
    + *      be a lambda function defined in an outer scope, or a attribute in produced by the plan's
    + *      child. If names are duplicate, the name defined in the most inner scope is used.
    + */
    +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
    +
    +  type LambdaVariableMap = Map[String, NamedExpression]
    +
    +  private val canonicalizer = {
    +    if (!conf.caseSensitiveAnalysis) {
    +      s: String => s.toLowerCase
    +    } else {
    +      s: String => s
    +    }
    +  }
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    plan.resolveOperators {
    +      case q: LogicalPlan =>
    +        q.mapExpressions(resolve(_, Map.empty))
    +    }
    +  }
    +
    +  /**
    +   * Create a bound lambda function by binding the arguments of a lambda function to the given
    +   * partial arguments (dataType and nullability only). If the expression happens to be an already
    +   * bound lambda function then we assume it has been bound to the correct arguments and do
    +   * nothing. This function will produce a lambda function with hidden arguments when it is passed
    +   * an arbitrary expression.
    +   */
    +  private def createLambda(
    +      e: Expression,
    +      partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
    +    case f: LambdaFunction if f.bound => f
    +
    +    case LambdaFunction(function, names, _) =>
    +      if (names.size != partialArguments.size) {
    +        e.failAnalysis(
    +          s"The number of lambda function arguments '${names.size}' does not " +
    +            "match the number of arguments expected by the higher order function " +
    +            s"'${partialArguments.size}'.")
    +      }
    +
    +      if (names.map(a => canonicalizer(a.name)).distinct.size < names.size) {
    +        e.failAnalysis(
    +          "Lambda function arguments should not have names that are semantically the same.")
    +      }
    +
    +      val arguments = partialArguments.zip(names).map {
    +        case ((dataType, nullable), ne) =>
    +          NamedLambdaVariable(ne.name, dataType, nullable)
    +      }
    +      LambdaFunction(function, arguments)
    +
    +    case _ =>
    +      // This expression does not consume any of the lambda's arguments (it is independent). We do
    +      // create a lambda function with default parameters because this is expected by the higher
    +      // order function. Note that we hide the lambda variables produced by this function in order
    +      // to prevent accidental naming collisions.
    +      val arguments = partialArguments.zipWithIndex.map {
    +        case ((dataType, nullable), i) =>
    +          NamedLambdaVariable(s"col$i", dataType, nullable)
    +      }
    +      LambdaFunction(e, arguments, hidden = true)
    +  }
    +
    +  /**
    +   * Resolve lambda variables in the expression subtree, using the passed lambda variable registry.
    +   */
    +  private def resolve(e: Expression, parentLambdaMap: LambdaVariableMap): Expression = e match {
    +    case _ if e.resolved => e
    +
    +    case h: HigherOrderFunction if h.inputResolved =>
    --- End diff --
    
    Let me think about it later.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208276367
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.analysis
    +
    +import org.apache.spark.sql.catalyst.catalog.SessionCatalog
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.DataType
    +
    +/**
    + * Resolve a higher order functions from the catalog. This is different from regular function
    + * resolution because lambda functions can only be resolved after the function has been resolved;
    + * so we need to resolve higher order function when all children are either resolved or a lambda
    + * function.
    + */
    +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    +    case q: LogicalPlan =>
    +      q.transformExpressions {
    +        case u @ UnresolvedFunction(fn, children, false)
    +            if hasLambdaAndResolvedArguments(children) =>
    +          withPosition(u) {
    +            catalog.lookupFunction(fn, children) match {
    +              case func: HigherOrderFunction => func
    +              case other => other.failAnalysis(
    +                "A lambda function should only be used in a higher order function. However, " +
    +                  s"its class is ${other.getClass.getCanonicalName}, which is not a " +
    +                  s"higher order function.")
    +            }
    +          }
    +      }
    +  }
    +
    +  /**
    +   * Check if the arguments of a function are either resolved or a lambda function.
    +   */
    +  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
    +    val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
    +    lambdas.nonEmpty && others.forall(_.resolved)
    +  }
    +}
    +
    +/**
    + * Resolve the lambda variables exposed by a higher order functions.
    + *
    + * This rule works in two steps:
    + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
    + *      arguments; this creates named and typed lambda variables. The argument names are checked
    + *      for duplicates and the number of arguments are checked during this step.
    + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
    + *      Note that we allow the use of variables from outside the current lambda, this can either
    + *      be a lambda function defined in an outer scope, or a attribute in produced by the plan's
    + *      child. If names are duplicate, the name defined in the most inner scope is used.
    + */
    +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
    +
    +  type LambdaVariableMap = Map[String, NamedExpression]
    +
    +  private val canonicalizer = {
    +    if (!conf.caseSensitiveAnalysis) {
    +      s: String => s.toLowerCase
    +    } else {
    +      s: String => s
    +    }
    +  }
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    plan.resolveOperators {
    +      case q: LogicalPlan =>
    +        q.mapExpressions(resolve(_, Map.empty))
    +    }
    +  }
    +
    +  /**
    +   * Create a bound lambda function by binding the arguments of a lambda function to the given
    +   * partial arguments (dataType and nullability only). If the expression happens to be an already
    +   * bound lambda function then we assume it has been bound to the correct arguments and do
    +   * nothing. This function will produce a lambda function with hidden arguments when it is passed
    +   * an arbitrary expression.
    +   */
    +  private def createLambda(
    +      e: Expression,
    +      partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
    +    case f: LambdaFunction if f.bound => f
    +
    +    case LambdaFunction(function, names, _) =>
    +      if (names.size != partialArguments.size) {
    +        e.failAnalysis(
    +          s"The number of lambda function arguments '${names.size}' does not " +
    +            "match the number of arguments expected by the higher order function " +
    +            s"'${partialArguments.size}'.")
    +      }
    +
    +      if (names.map(a => canonicalizer(a.name)).distinct.size < names.size) {
    +        e.failAnalysis(
    +          "Lambda function arguments should not have names that are semantically the same.")
    +      }
    +
    +      val arguments = partialArguments.zip(names).map {
    +        case ((dataType, nullable), ne) =>
    +          NamedLambdaVariable(ne.name, dataType, nullable)
    +      }
    +      LambdaFunction(function, arguments)
    +
    +    case _ =>
    +      // This expression does not consume any of the lambda's arguments (it is independent). We do
    +      // create a lambda function with default parameters because this is expected by the higher
    +      // order function. Note that we hide the lambda variables produced by this function in order
    +      // to prevent accidental naming collisions.
    +      val arguments = partialArguments.zipWithIndex.map {
    +        case ((dataType, nullable), i) =>
    +          NamedLambdaVariable(s"col$i", dataType, nullable)
    +      }
    +      LambdaFunction(e, arguments, hidden = true)
    +  }
    +
    +  /**
    +   * Resolve lambda variables in the expression subtree, using the passed lambda variable registry.
    +   */
    +  private def resolve(e: Expression, parentLambdaMap: LambdaVariableMap): Expression = e match {
    +    case _ if e.resolved => e
    +
    +    case h: HigherOrderFunction if h.inputResolved =>
    --- End diff --
    
    can add some basic type check here? Then we can fail fast if the `ArrayTransform#input` is not array type, and we don't need the hacky workaround in `ArrayTransform#bind`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1633/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93934/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #94019 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94019/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #93973 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93973/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] `
      * `                  s\"its class is $`
      * `case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] `
      * `case class NamedLambdaVariable(`
      * `case class LambdaFunction(`
      * `trait HigherOrderFunction extends Expression `
      * `trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes `
      * `case class ArrayTransform(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207162371
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    --- End diff --
    
    Ah, maybe I should override `fastEquals` instead of using `AtomicReference`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207098029
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,325 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def genCode(ctx: CodegenContext): ExprCode = {
    +    val suffix = "_lambda_variable_" + exprId.id
    +    ExprCode(
    +      if (nullable) JavaCode.isNullVariable(s"isNull_${name}$suffix") else FalseLiteral,
    +      JavaCode.variable(s"value_${name}$suffix", dataType))
    +  }
    +
    +  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    +    throw new IllegalStateException("NamedLambdaVariable.doGenCode should not be called.")
    +  }
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +
    +  override def genCode(ctx: CodegenContext): ExprCode = {
    +    function.genCode(ctx)
    +  }
    +
    +  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    +    throw new IllegalStateException("LambdaFunction.doGenCode should not be called.")
    +  }
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    --- End diff --
    
    Why we need to transform `NamedLambdaVariable` in `function` by `arguments` here? Aren't `arguments` also `NamedLambdaVariable` and we already resolve expressions in `function` at `ResolveLambdaVariables`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #93973 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93973/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208267632
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression
    +  with CodegenFallback {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression with CodegenFallback {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    --- End diff --
    
    seems function must be `LambdaFunction`, why don't we enforce it at type level?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207102738
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,325 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def genCode(ctx: CodegenContext): ExprCode = {
    +    val suffix = "_lambda_variable_" + exprId.id
    +    ExprCode(
    +      if (nullable) JavaCode.isNullVariable(s"isNull_${name}$suffix") else FalseLiteral,
    +      JavaCode.variable(s"value_${name}$suffix", dataType))
    +  }
    +
    +  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    +    throw new IllegalStateException("NamedLambdaVariable.doGenCode should not be called.")
    +  }
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +
    +  override def genCode(ctx: CodegenContext): ExprCode = {
    +    function.genCode(ctx)
    +  }
    +
    +  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    +    throw new IllegalStateException("LambdaFunction.doGenCode should not be called.")
    +  }
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    --- End diff --
    
    I'm worried that the `NamedLambdaVariable` is instantiated separately during serialization or something. In that case, we might not be able to refer the same instance and set the argument values correctly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21954


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93950/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1603/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207148555
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    --- End diff --
    
    Actually, also when creating `functionsForEval`. I needed it for `transformUp` work properly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207141916
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression
    +  with CodegenFallback {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression with CodegenFallback {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    +        case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
    +          argumentMap(variable.exprId)
    +      }
    +  }
    +}
    +
    +trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
    +
    +  def input: Expression
    +
    +  override def inputs: Seq[Expression] = input :: Nil
    +
    +  def function: Expression
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  def expectingFunctionType: AbstractDataType = AnyDataType
    +
    +  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +}
    +
    +/**
    + * Transform elements in an array using the transform function. This is similar to
    + * a `map` in functional programming.
    + */
    +@ExpressionDescription(
    +  usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
    +       array(2, 3, 4)
    +      > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
    +       array(1, 3, 5)
    +  """,
    +  since = "2.4.0")
    +case class ArrayTransform(
    +    input: Expression,
    +    function: Expression)
    +  extends ArrayBasedHigherOrderFunction with CodegenFallback {
    +
    +  override def nullable: Boolean = input.nullable
    +
    +  override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
    +    val (elementType, containsNull) = input.dataType match {
    +      case ArrayType(elementType, containsNull) => (elementType, containsNull)
    +      case _ =>
    +        val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
    --- End diff --
    
    When does this happen?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208277492
  
    --- Diff: sql/core/src/test/resources/sql-tests/inputs/higher-order-functions.sql ---
    @@ -0,0 +1,26 @@
    +create or replace temporary view nested as values
    +  (1, array(32, 97), array(array(12, 99), array(123, 42), array(1))),
    +  (2, array(77, -76), array(array(6, 96, 65), array(-1, -2))),
    +  (3, array(12), array(array(17)))
    +  as t(x, ys, zs);
    +
    +-- Only allow lambda's in higher order functions.
    +select upper(x -> x) as v;
    +
    +-- Identity transform an array
    +select transform(zs, z -> z) as v from nested;
    +
    +-- Transform an array
    +select transform(ys, y -> y * y) as v from nested;
    +
    +-- Transform an array with index
    +select transform(ys, (y, i) -> y + i) as v from nested;
    +
    +-- Transform an array with reference
    +select transform(zs, z -> concat(ys, z)) as v from nested;
    +
    +-- Transform an array to an array of 0's
    +select transform(ys, 0) as v from nested;
    +
    +-- Transform a null array
    +select transform(cast(null as array<int>), x -> x + 1) as v;
    --- End diff --
    
    shall we add a test for nested lambda?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208282200
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression
    +  with CodegenFallback {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression with CodegenFallback {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    +        case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
    +          argumentMap(variable.exprId)
    +      }
    +  }
    +}
    +
    +trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
    +
    +  def input: Expression
    +
    +  override def inputs: Seq[Expression] = input :: Nil
    +
    +  def function: Expression
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  def expectingFunctionType: AbstractDataType = AnyDataType
    +
    +  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +}
    +
    +/**
    + * Transform elements in an array using the transform function. This is similar to
    + * a `map` in functional programming.
    + */
    +@ExpressionDescription(
    +  usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
    +       array(2, 3, 4)
    +      > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
    +       array(1, 3, 5)
    +  """,
    +  since = "2.4.0")
    +case class ArrayTransform(
    +    input: Expression,
    +    function: Expression)
    +  extends ArrayBasedHigherOrderFunction with CodegenFallback {
    +
    +  override def nullable: Boolean = input.nullable
    +
    +  override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
    +    val (elementType, containsNull) = input.dataType match {
    +      case ArrayType(elementType, containsNull) => (elementType, containsNull)
    +      case _ =>
    +        val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
    +        (elementType, containsNull)
    +    }
    +    function match {
    +      case LambdaFunction(_, arguments, _) if arguments.size == 2 =>
    +        copy(function = f(function, (elementType, containsNull) :: (IntegerType, false) :: Nil))
    +      case _ =>
    +        copy(function = f(function, (elementType, containsNull) :: Nil))
    +    }
    +  }
    +
    +  @transient lazy val (elementVar, indexVar) = {
    +    val LambdaFunction(_, (elementVar: NamedLambdaVariable) +: tail, _) = function
    +    val indexVar = if (tail.nonEmpty) {
    +      Some(tail.head.asInstanceOf[NamedLambdaVariable])
    +    } else {
    +      None
    +    }
    +    (elementVar, indexVar)
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val arr = this.input.eval(input).asInstanceOf[ArrayData]
    --- End diff --
    
    I'll see the other prs and submit a follow-up as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94019/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93973/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #93950 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93950/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] `
      * `                  s\"its class is $`
      * `case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] `
      * `case class NamedLambdaVariable(`
      * `case class LambdaFunction(`
      * `trait HigherOrderFunction extends Expression `
      * `trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes `
      * `case class ArrayTransform(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #93934 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93934/testReport)** for PR 21954 at commit [`ee450c5`](https://github.com/apache/spark/commit/ee450c5ef3f99d3bbf8dbbd05273bc63005bbccb).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207172497
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    --- End diff --
    
    I see. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #94002 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94002/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208281900
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression
    +  with CodegenFallback {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression with CodegenFallback {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    +        case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
    +          argumentMap(variable.exprId)
    +      }
    +  }
    +}
    +
    +trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
    +
    +  def input: Expression
    +
    +  override def inputs: Seq[Expression] = input :: Nil
    +
    +  def function: Expression
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  def expectingFunctionType: AbstractDataType = AnyDataType
    +
    +  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    --- End diff --
    
    Ah, makes sense. Currently we have some prs for other higher-order functions, so I'll see them and submit a follow-up if needed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94002/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207169967
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    --- End diff --
    
    Hmm, seems like overriding `fastEquals` is not enough..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208282971
  
    --- Diff: sql/core/src/test/resources/sql-tests/inputs/higher-order-functions.sql ---
    @@ -0,0 +1,26 @@
    +create or replace temporary view nested as values
    +  (1, array(32, 97), array(array(12, 99), array(123, 42), array(1))),
    +  (2, array(77, -76), array(array(6, 96, 65), array(-1, -2))),
    +  (3, array(12), array(array(17)))
    +  as t(x, ys, zs);
    +
    +-- Only allow lambda's in higher order functions.
    +select upper(x -> x) as v;
    +
    +-- Identity transform an array
    +select transform(zs, z -> z) as v from nested;
    +
    +-- Transform an array
    +select transform(ys, y -> y * y) as v from nested;
    +
    +-- Transform an array with index
    +select transform(ys, (y, i) -> y + i) as v from nested;
    +
    +-- Transform an array with reference
    +select transform(zs, z -> concat(ys, z)) as v from nested;
    +
    +-- Transform an array to an array of 0's
    +select transform(ys, 0) as v from nested;
    +
    +-- Transform a null array
    +select transform(cast(null as array<int>), x -> x + 1) as v;
    --- End diff --
    
    Actually we have some at #21965 and #21982.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207162167
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    --- End diff --
    
    When I tried to make copies of `NamedLambdaVariable`s, the `transformUp` doesn't replace the variables, and generated wrong results.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #94019 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94019/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] `
      * `                  s\"its class is $`
      * `case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] `
      * `case class NamedLambdaVariable(`
      * `case class LambdaFunction(`
      * `trait HigherOrderFunction extends Expression `
      * `trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes `
      * `case class ArrayTransform(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208269300
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression
    +  with CodegenFallback {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression with CodegenFallback {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    +        case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
    +          argumentMap(variable.exprId)
    +      }
    +  }
    +}
    +
    +trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
    +
    +  def input: Expression
    +
    +  override def inputs: Seq[Expression] = input :: Nil
    +
    +  def function: Expression
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  def expectingFunctionType: AbstractDataType = AnyDataType
    +
    +  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +}
    +
    +/**
    + * Transform elements in an array using the transform function. This is similar to
    + * a `map` in functional programming.
    + */
    +@ExpressionDescription(
    +  usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
    +       array(2, 3, 4)
    +      > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
    +       array(1, 3, 5)
    +  """,
    +  since = "2.4.0")
    +case class ArrayTransform(
    +    input: Expression,
    +    function: Expression)
    +  extends ArrayBasedHigherOrderFunction with CodegenFallback {
    +
    +  override def nullable: Boolean = input.nullable
    +
    +  override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
    +    val (elementType, containsNull) = input.dataType match {
    +      case ArrayType(elementType, containsNull) => (elementType, containsNull)
    +      case _ =>
    +        val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
    +        (elementType, containsNull)
    +    }
    +    function match {
    +      case LambdaFunction(_, arguments, _) if arguments.size == 2 =>
    +        copy(function = f(function, (elementType, containsNull) :: (IntegerType, false) :: Nil))
    +      case _ =>
    +        copy(function = f(function, (elementType, containsNull) :: Nil))
    +    }
    +  }
    +
    +  @transient lazy val (elementVar, indexVar) = {
    +    val LambdaFunction(_, (elementVar: NamedLambdaVariable) +: tail, _) = function
    +    val indexVar = if (tail.nonEmpty) {
    +      Some(tail.head.asInstanceOf[NamedLambdaVariable])
    +    } else {
    +      None
    +    }
    +    (elementVar, indexVar)
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val arr = this.input.eval(input).asInstanceOf[ArrayData]
    --- End diff --
    
    nit: we should do some renaming to avoid the conflict, e.g. rename `ArrayBasedHigherOrderFunction#input` to `inputArray`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208266333
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression
    +  with CodegenFallback {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression with CodegenFallback {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    +        case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
    +          argumentMap(variable.exprId)
    +      }
    +  }
    +}
    +
    +trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
    +
    +  def input: Expression
    +
    +  override def inputs: Seq[Expression] = input :: Nil
    +
    +  def function: Expression
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  def expectingFunctionType: AbstractDataType = AnyDataType
    +
    +  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    --- End diff --
    
    does this need to be a `lazy val`? `Seq#head` is very cheap.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    cc @hvanhovell 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by holdensmagicalunicorn <gi...@git.apache.org>.
Github user holdensmagicalunicorn commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    @ueshin, thanks! I am a bot who has found some folks who might be able to help with the review:@rxin, @cloud-fan and @hvanhovell


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1615/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1659/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207143138
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression
    +  with CodegenFallback {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden when a user wants to
    + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression with CodegenFallback {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and applies these to some objects.
    + * The function produces a number of variables which can be consumed by some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    +        case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
    +          argumentMap(variable.exprId)
    +      }
    +  }
    +}
    +
    +trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
    +
    +  def input: Expression
    +
    +  override def inputs: Seq[Expression] = input :: Nil
    +
    +  def function: Expression
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  def expectingFunctionType: AbstractDataType = AnyDataType
    +
    +  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +}
    +
    +/**
    + * Transform elements in an array using the transform function. This is similar to
    + * a `map` in functional programming.
    + */
    +@ExpressionDescription(
    +  usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
    +       array(2, 3, 4)
    +      > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
    +       array(1, 3, 5)
    +  """,
    +  since = "2.4.0")
    +case class ArrayTransform(
    +    input: Expression,
    +    function: Expression)
    +  extends ArrayBasedHigherOrderFunction with CodegenFallback {
    +
    +  override def nullable: Boolean = input.nullable
    +
    +  override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
    +    val (elementType, containsNull) = input.dataType match {
    +      case ArrayType(elementType, containsNull) => (elementType, containsNull)
    +      case _ =>
    +        val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
    --- End diff --
    
    It happens when the first argument is not an array (e.g., https://github.com/apache/spark/pull/21954/files#diff-8e1a34391fdefa4a3a0349d7d454d86fR1798).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208272808
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.analysis
    +
    +import org.apache.spark.sql.catalyst.catalog.SessionCatalog
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.DataType
    +
    +/**
    + * Resolve a higher order functions from the catalog. This is different from regular function
    + * resolution because lambda functions can only be resolved after the function has been resolved;
    + * so we need to resolve higher order function when all children are either resolved or a lambda
    + * function.
    + */
    +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    +    case q: LogicalPlan =>
    +      q.transformExpressions {
    +        case u @ UnresolvedFunction(fn, children, false)
    +            if hasLambdaAndResolvedArguments(children) =>
    +          withPosition(u) {
    +            catalog.lookupFunction(fn, children) match {
    +              case func: HigherOrderFunction => func
    +              case other => other.failAnalysis(
    +                "A lambda function should only be used in a higher order function. However, " +
    +                  s"its class is ${other.getClass.getCanonicalName}, which is not a " +
    +                  s"higher order function.")
    +            }
    +          }
    +      }
    +  }
    +
    +  /**
    +   * Check if the arguments of a function are either resolved or a lambda function.
    +   */
    +  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
    +    val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
    +    lambdas.nonEmpty && others.forall(_.resolved)
    +  }
    +}
    +
    +/**
    + * Resolve the lambda variables exposed by a higher order functions.
    + *
    + * This rule works in two steps:
    + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
    + *      arguments; this creates named and typed lambda variables. The argument names are checked
    + *      for duplicates and the number of arguments are checked during this step.
    + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
    + *      Note that we allow the use of variables from outside the current lambda, this can either
    + *      be a lambda function defined in an outer scope, or a attribute in produced by the plan's
    + *      child. If names are duplicate, the name defined in the most inner scope is used.
    + */
    +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
    +
    +  type LambdaVariableMap = Map[String, NamedExpression]
    +
    +  private val canonicalizer = {
    +    if (!conf.caseSensitiveAnalysis) {
    +      s: String => s.toLowerCase
    +    } else {
    +      s: String => s
    +    }
    +  }
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    plan.resolveOperators {
    +      case q: LogicalPlan =>
    +        q.mapExpressions(resolve(_, Map.empty))
    +    }
    +  }
    +
    +  /**
    +   * Create a bound lambda function by binding the arguments of a lambda function to the given
    +   * partial arguments (dataType and nullability only). If the expression happens to be an already
    +   * bound lambda function then we assume it has been bound to the correct arguments and do
    +   * nothing. This function will produce a lambda function with hidden arguments when it is passed
    +   * an arbitrary expression.
    +   */
    +  private def createLambda(
    +      e: Expression,
    +      partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
    --- End diff --
    
    why call it "partial"?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    **[Test build #93934 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93934/testReport)** for PR 21954 at commit [`ee450c5`](https://github.com/apache/spark/commit/ee450c5ef3f99d3bbf8dbbd05273bc63005bbccb).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21954
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org