You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ueshin <gi...@git.apache.org> on 2018/08/02 03:12:45 UTC
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
GitHub user ueshin opened a pull request:
https://github.com/apache/spark/pull/21954
[SPARK-23908][SQL] Add transform function.
## What changes were proposed in this pull request?
This pr adds `transform` function which transforms elements in an array using the function.
Optionally we can take the index of each element as the second argument.
```sql
> SELECT transform(array(1, 2, 3), x -> x + 1);
array(2, 3, 4)
> SELECT transform(array(1, 2, 3), (x, i) -> x + i);
array(1, 3, 5)
```
## How was this patch tested?
Added tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ueshin/apache-spark issues/SPARK-23908/transform
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21954.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21954
----
commit e860249c96854daefccc580ca52e3fad7d1acf67
Author: Takuya UESHIN <ue...@...>
Date: 2018-08-01T03:46:00Z
Add `LambdaFunction` and its parser.
commit 06825b6a3ae6b3085c4b4e5c010dffa75e988801
Author: Takuya UESHIN <ue...@...>
Date: 2018-08-01T05:52:40Z
Add `ResolveHigherOrderFunctions`.
commit 17ab2ffc73a664ba2b00d49f0835faff055274b0
Author: Takuya UESHIN <ue...@...>
Date: 2018-08-01T05:59:32Z
Add `ArrayTransform`.
commit 95a06b4fe3f660c617a6a53bf0cccc5a0a62306b
Author: Takuya UESHIN <ue...@...>
Date: 2018-08-01T08:06:01Z
Test in sql/core.
commit 4448d0b7a085c1613dc0dd52009b6d50388ec605
Author: Takuya UESHIN <ue...@...>
Date: 2018-08-01T09:20:14Z
Add negative cases.
commit abc685f86ee205f2a64b065c98a65fc2d36bfd75
Author: Takuya UESHIN <ue...@...>
Date: 2018-08-01T09:46:00Z
Add sql file.
commit ee450c5ef3f99d3bbf8dbbd05273bc63005bbccb
Author: Takuya UESHIN <ue...@...>
Date: 2018-08-01T10:15:44Z
Replace lambda variable in function by one in arguments to make sure the variables are the same as them in arguments.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:
https://github.com/apache/spark/pull/21954
Jenkins, retest this please.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #93950 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93950/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1671/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208268129
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression
+ with CodegenFallback {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression with CodegenFallback {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
+
+ /**
+ * All inputs must be resolved and all functions must be resolved lambda functions.
+ */
+ override lazy val resolved: Boolean = inputResolved && functions.forall {
+ case l: LambdaFunction => l.resolved
+ case _ => false
+ }
+
+ /**
+ * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
+ * bind function takes the potential lambda and it's (partial) arguments and converts this into
+ * a bound lambda function.
+ */
+ def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
+
+ @transient lazy val functionsForEval: Seq[Expression] = functions.map {
+ case LambdaFunction(function, arguments, hidden) =>
+ val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
+ function.transformUp {
+ case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
+ argumentMap(variable.exprId)
+ }
+ }
+}
+
+trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
+
+ def input: Expression
+
+ override def inputs: Seq[Expression] = input :: Nil
+
+ def function: Expression
+
+ override def functions: Seq[Expression] = function :: Nil
+
+ def expectingFunctionType: AbstractDataType = AnyDataType
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
+
+ @transient lazy val functionForEval: Expression = functionsForEval.head
+}
+
+/**
+ * Transform elements in an array using the transform function. This is similar to
+ * a `map` in functional programming.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
+ array(2, 3, 4)
+ > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
+ array(1, 3, 5)
+ """,
+ since = "2.4.0")
+case class ArrayTransform(
+ input: Expression,
+ function: Expression)
+ extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+ override def nullable: Boolean = input.nullable
+
+ override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
+
+ override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
+ val (elementType, containsNull) = input.dataType match {
+ case ArrayType(elementType, containsNull) => (elementType, containsNull)
+ case _ =>
+ val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
--- End diff --
Then shall we fail the analysis before going into `bind`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207171941
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
--- End diff --
Yeah, that makes sense. Let's leave it for now.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208273712
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different from regular function
+ * resolution because lambda functions can only be resolved after the function has been resolved;
+ * so we need to resolve higher order function when all children are either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.transformExpressions {
+ case u @ UnresolvedFunction(fn, children, false)
+ if hasLambdaAndResolvedArguments(children) =>
+ withPosition(u) {
+ catalog.lookupFunction(fn, children) match {
+ case func: HigherOrderFunction => func
+ case other => other.failAnalysis(
+ "A lambda function should only be used in a higher order function. However, " +
+ s"its class is ${other.getClass.getCanonicalName}, which is not a " +
+ s"higher order function.")
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the arguments of a function are either resolved or a lambda function.
+ */
+ private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
+ val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
+ lambdas.nonEmpty && others.forall(_.resolved)
+ }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
+ * arguments; this creates named and typed lambda variables. The argument names are checked
+ * for duplicates and the number of arguments are checked during this step.
+ * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
+ * Note that we allow the use of variables from outside the current lambda, this can either
+ * be a lambda function defined in an outer scope, or a attribute in produced by the plan's
+ * child. If names are duplicate, the name defined in the most inner scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
+
+ type LambdaVariableMap = Map[String, NamedExpression]
+
+ private val canonicalizer = {
+ if (!conf.caseSensitiveAnalysis) {
+ s: String => s.toLowerCase
+ } else {
+ s: String => s
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.mapExpressions(resolve(_, Map.empty))
+ }
+ }
+
+ /**
+ * Create a bound lambda function by binding the arguments of a lambda function to the given
+ * partial arguments (dataType and nullability only). If the expression happens to be an already
+ * bound lambda function then we assume it has been bound to the correct arguments and do
+ * nothing. This function will produce a lambda function with hidden arguments when it is passed
+ * an arbitrary expression.
+ */
+ private def createLambda(
+ e: Expression,
+ partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
--- End diff --
They are partial because we only pass the dataType and nullable flag.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208282782
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different from regular function
+ * resolution because lambda functions can only be resolved after the function has been resolved;
+ * so we need to resolve higher order function when all children are either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.transformExpressions {
+ case u @ UnresolvedFunction(fn, children, false)
+ if hasLambdaAndResolvedArguments(children) =>
+ withPosition(u) {
+ catalog.lookupFunction(fn, children) match {
+ case func: HigherOrderFunction => func
+ case other => other.failAnalysis(
+ "A lambda function should only be used in a higher order function. However, " +
+ s"its class is ${other.getClass.getCanonicalName}, which is not a " +
+ s"higher order function.")
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the arguments of a function are either resolved or a lambda function.
+ */
+ private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
+ val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
+ lambdas.nonEmpty && others.forall(_.resolved)
+ }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
+ * arguments; this creates named and typed lambda variables. The argument names are checked
+ * for duplicates and the number of arguments are checked during this step.
+ * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
+ * Note that we allow the use of variables from outside the current lambda, this can either
+ * be a lambda function defined in an outer scope, or a attribute in produced by the plan's
+ * child. If names are duplicate, the name defined in the most inner scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
+
+ type LambdaVariableMap = Map[String, NamedExpression]
+
+ private val canonicalizer = {
+ if (!conf.caseSensitiveAnalysis) {
+ s: String => s.toLowerCase
+ } else {
+ s: String => s
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.mapExpressions(resolve(_, Map.empty))
+ }
+ }
+
+ /**
+ * Create a bound lambda function by binding the arguments of a lambda function to the given
+ * partial arguments (dataType and nullability only). If the expression happens to be an already
+ * bound lambda function then we assume it has been bound to the correct arguments and do
+ * nothing. This function will produce a lambda function with hidden arguments when it is passed
+ * an arbitrary expression.
+ */
+ private def createLambda(
+ e: Expression,
+ partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
--- End diff --
how about `argInfo`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #94002 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94002/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:
* `case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] `
* ` s\"its class is $`
* `case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] `
* `case class NamedLambdaVariable(`
* `case class LambdaFunction(`
* `trait HigherOrderFunction extends Expression `
* `trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes `
* `case class ArrayTransform(`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207145478
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
--- End diff --
You are only using the `AtomicReference ` as an container right?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:
https://github.com/apache/spark/pull/21954
retest this please
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207158636
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
--- End diff --
You did? Could you elaborate? There shouldn't be any current access here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208282941
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different from regular function
+ * resolution because lambda functions can only be resolved after the function has been resolved;
+ * so we need to resolve higher order function when all children are either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.transformExpressions {
+ case u @ UnresolvedFunction(fn, children, false)
+ if hasLambdaAndResolvedArguments(children) =>
+ withPosition(u) {
+ catalog.lookupFunction(fn, children) match {
+ case func: HigherOrderFunction => func
+ case other => other.failAnalysis(
+ "A lambda function should only be used in a higher order function. However, " +
+ s"its class is ${other.getClass.getCanonicalName}, which is not a " +
+ s"higher order function.")
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the arguments of a function are either resolved or a lambda function.
+ */
+ private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
+ val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
+ lambdas.nonEmpty && others.forall(_.resolved)
+ }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
+ * arguments; this creates named and typed lambda variables. The argument names are checked
+ * for duplicates and the number of arguments are checked during this step.
+ * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
+ * Note that we allow the use of variables from outside the current lambda, this can either
+ * be a lambda function defined in an outer scope, or a attribute in produced by the plan's
+ * child. If names are duplicate, the name defined in the most inner scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
+
+ type LambdaVariableMap = Map[String, NamedExpression]
+
+ private val canonicalizer = {
+ if (!conf.caseSensitiveAnalysis) {
+ s: String => s.toLowerCase
+ } else {
+ s: String => s
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.mapExpressions(resolve(_, Map.empty))
+ }
+ }
+
+ /**
+ * Create a bound lambda function by binding the arguments of a lambda function to the given
+ * partial arguments (dataType and nullability only). If the expression happens to be an already
+ * bound lambda function then we assume it has been bound to the correct arguments and do
+ * nothing. This function will produce a lambda function with hidden arguments when it is passed
+ * an arbitrary expression.
+ */
+ private def createLambda(
+ e: Expression,
+ partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
+ case f: LambdaFunction if f.bound => f
+
+ case LambdaFunction(function, names, _) =>
+ if (names.size != partialArguments.size) {
+ e.failAnalysis(
+ s"The number of lambda function arguments '${names.size}' does not " +
+ "match the number of arguments expected by the higher order function " +
+ s"'${partialArguments.size}'.")
+ }
+
+ if (names.map(a => canonicalizer(a.name)).distinct.size < names.size) {
+ e.failAnalysis(
+ "Lambda function arguments should not have names that are semantically the same.")
+ }
+
+ val arguments = partialArguments.zip(names).map {
+ case ((dataType, nullable), ne) =>
+ NamedLambdaVariable(ne.name, dataType, nullable)
+ }
+ LambdaFunction(function, arguments)
+
+ case _ =>
+ // This expression does not consume any of the lambda's arguments (it is independent). We do
+ // create a lambda function with default parameters because this is expected by the higher
+ // order function. Note that we hide the lambda variables produced by this function in order
+ // to prevent accidental naming collisions.
+ val arguments = partialArguments.zipWithIndex.map {
+ case ((dataType, nullable), i) =>
+ NamedLambdaVariable(s"col$i", dataType, nullable)
+ }
+ LambdaFunction(e, arguments, hidden = true)
+ }
+
+ /**
+ * Resolve lambda variables in the expression subtree, using the passed lambda variable registry.
+ */
+ private def resolve(e: Expression, parentLambdaMap: LambdaVariableMap): Expression = e match {
+ case _ if e.resolved => e
+
+ case h: HigherOrderFunction if h.inputResolved =>
--- End diff --
Let me think about it later.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208276367
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different from regular function
+ * resolution because lambda functions can only be resolved after the function has been resolved;
+ * so we need to resolve higher order function when all children are either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.transformExpressions {
+ case u @ UnresolvedFunction(fn, children, false)
+ if hasLambdaAndResolvedArguments(children) =>
+ withPosition(u) {
+ catalog.lookupFunction(fn, children) match {
+ case func: HigherOrderFunction => func
+ case other => other.failAnalysis(
+ "A lambda function should only be used in a higher order function. However, " +
+ s"its class is ${other.getClass.getCanonicalName}, which is not a " +
+ s"higher order function.")
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the arguments of a function are either resolved or a lambda function.
+ */
+ private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
+ val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
+ lambdas.nonEmpty && others.forall(_.resolved)
+ }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
+ * arguments; this creates named and typed lambda variables. The argument names are checked
+ * for duplicates and the number of arguments are checked during this step.
+ * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
+ * Note that we allow the use of variables from outside the current lambda, this can either
+ * be a lambda function defined in an outer scope, or a attribute in produced by the plan's
+ * child. If names are duplicate, the name defined in the most inner scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
+
+ type LambdaVariableMap = Map[String, NamedExpression]
+
+ private val canonicalizer = {
+ if (!conf.caseSensitiveAnalysis) {
+ s: String => s.toLowerCase
+ } else {
+ s: String => s
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.mapExpressions(resolve(_, Map.empty))
+ }
+ }
+
+ /**
+ * Create a bound lambda function by binding the arguments of a lambda function to the given
+ * partial arguments (dataType and nullability only). If the expression happens to be an already
+ * bound lambda function then we assume it has been bound to the correct arguments and do
+ * nothing. This function will produce a lambda function with hidden arguments when it is passed
+ * an arbitrary expression.
+ */
+ private def createLambda(
+ e: Expression,
+ partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
+ case f: LambdaFunction if f.bound => f
+
+ case LambdaFunction(function, names, _) =>
+ if (names.size != partialArguments.size) {
+ e.failAnalysis(
+ s"The number of lambda function arguments '${names.size}' does not " +
+ "match the number of arguments expected by the higher order function " +
+ s"'${partialArguments.size}'.")
+ }
+
+ if (names.map(a => canonicalizer(a.name)).distinct.size < names.size) {
+ e.failAnalysis(
+ "Lambda function arguments should not have names that are semantically the same.")
+ }
+
+ val arguments = partialArguments.zip(names).map {
+ case ((dataType, nullable), ne) =>
+ NamedLambdaVariable(ne.name, dataType, nullable)
+ }
+ LambdaFunction(function, arguments)
+
+ case _ =>
+ // This expression does not consume any of the lambda's arguments (it is independent). We do
+ // create a lambda function with default parameters because this is expected by the higher
+ // order function. Note that we hide the lambda variables produced by this function in order
+ // to prevent accidental naming collisions.
+ val arguments = partialArguments.zipWithIndex.map {
+ case ((dataType, nullable), i) =>
+ NamedLambdaVariable(s"col$i", dataType, nullable)
+ }
+ LambdaFunction(e, arguments, hidden = true)
+ }
+
+ /**
+ * Resolve lambda variables in the expression subtree, using the passed lambda variable registry.
+ */
+ private def resolve(e: Expression, parentLambdaMap: LambdaVariableMap): Expression = e match {
+ case _ if e.resolved => e
+
+ case h: HigherOrderFunction if h.inputResolved =>
--- End diff --
can add some basic type check here? Then we can fail fast if the `ArrayTransform#input` is not array type, and we don't need the hacky workaround in `ArrayTransform#bind`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1633/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93934/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #94019 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94019/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #93973 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93973/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:
* `case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] `
* ` s\"its class is $`
* `case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] `
* `case class NamedLambdaVariable(`
* `case class LambdaFunction(`
* `trait HigherOrderFunction extends Expression `
* `trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes `
* `case class ArrayTransform(`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207162371
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
--- End diff --
Ah, maybe I should override `fastEquals` instead of using `AtomicReference`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207098029
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def genCode(ctx: CodegenContext): ExprCode = {
+ val suffix = "_lambda_variable_" + exprId.id
+ ExprCode(
+ if (nullable) JavaCode.isNullVariable(s"isNull_${name}$suffix") else FalseLiteral,
+ JavaCode.variable(s"value_${name}$suffix", dataType))
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ throw new IllegalStateException("NamedLambdaVariable.doGenCode should not be called.")
+ }
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+
+ override def genCode(ctx: CodegenContext): ExprCode = {
+ function.genCode(ctx)
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ throw new IllegalStateException("LambdaFunction.doGenCode should not be called.")
+ }
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
+
+ /**
+ * All inputs must be resolved and all functions must be resolved lambda functions.
+ */
+ override lazy val resolved: Boolean = inputResolved && functions.forall {
+ case l: LambdaFunction => l.resolved
+ case _ => false
+ }
+
+ /**
+ * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
+ * bind function takes the potential lambda and it's (partial) arguments and converts this into
+ * a bound lambda function.
+ */
+ def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
+
+ @transient lazy val functionsForEval: Seq[Expression] = functions.map {
+ case LambdaFunction(function, arguments, hidden) =>
+ val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
+ function.transformUp {
--- End diff --
Why we need to transform `NamedLambdaVariable` in `function` by `arguments` here? Aren't `arguments` also `NamedLambdaVariable` and we already resolve expressions in `function` at `ResolveLambdaVariables`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #93973 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93973/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208267632
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression
+ with CodegenFallback {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression with CodegenFallback {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
--- End diff --
seems function must be `LambdaFunction`, why don't we enforce it at type level?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207102738
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def genCode(ctx: CodegenContext): ExprCode = {
+ val suffix = "_lambda_variable_" + exprId.id
+ ExprCode(
+ if (nullable) JavaCode.isNullVariable(s"isNull_${name}$suffix") else FalseLiteral,
+ JavaCode.variable(s"value_${name}$suffix", dataType))
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ throw new IllegalStateException("NamedLambdaVariable.doGenCode should not be called.")
+ }
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+
+ override def genCode(ctx: CodegenContext): ExprCode = {
+ function.genCode(ctx)
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ throw new IllegalStateException("LambdaFunction.doGenCode should not be called.")
+ }
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
+
+ /**
+ * All inputs must be resolved and all functions must be resolved lambda functions.
+ */
+ override lazy val resolved: Boolean = inputResolved && functions.forall {
+ case l: LambdaFunction => l.resolved
+ case _ => false
+ }
+
+ /**
+ * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
+ * bind function takes the potential lambda and it's (partial) arguments and converts this into
+ * a bound lambda function.
+ */
+ def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
+
+ @transient lazy val functionsForEval: Seq[Expression] = functions.map {
+ case LambdaFunction(function, arguments, hidden) =>
+ val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
+ function.transformUp {
--- End diff --
I'm worried that the `NamedLambdaVariable` is instantiated separately during serialization or something. In that case, we might not be able to refer the same instance and set the argument values correctly.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/21954
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93950/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1603/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:
https://github.com/apache/spark/pull/21954
Jenkins, retest this please.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207148555
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
--- End diff --
Actually, also when creating `functionsForEval`. I needed it for `transformUp` work properly.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207141916
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression
+ with CodegenFallback {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression with CodegenFallback {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
+
+ /**
+ * All inputs must be resolved and all functions must be resolved lambda functions.
+ */
+ override lazy val resolved: Boolean = inputResolved && functions.forall {
+ case l: LambdaFunction => l.resolved
+ case _ => false
+ }
+
+ /**
+ * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
+ * bind function takes the potential lambda and it's (partial) arguments and converts this into
+ * a bound lambda function.
+ */
+ def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
+
+ @transient lazy val functionsForEval: Seq[Expression] = functions.map {
+ case LambdaFunction(function, arguments, hidden) =>
+ val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
+ function.transformUp {
+ case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
+ argumentMap(variable.exprId)
+ }
+ }
+}
+
+trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
+
+ def input: Expression
+
+ override def inputs: Seq[Expression] = input :: Nil
+
+ def function: Expression
+
+ override def functions: Seq[Expression] = function :: Nil
+
+ def expectingFunctionType: AbstractDataType = AnyDataType
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
+
+ @transient lazy val functionForEval: Expression = functionsForEval.head
+}
+
+/**
+ * Transform elements in an array using the transform function. This is similar to
+ * a `map` in functional programming.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
+ array(2, 3, 4)
+ > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
+ array(1, 3, 5)
+ """,
+ since = "2.4.0")
+case class ArrayTransform(
+ input: Expression,
+ function: Expression)
+ extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+ override def nullable: Boolean = input.nullable
+
+ override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
+
+ override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
+ val (elementType, containsNull) = input.dataType match {
+ case ArrayType(elementType, containsNull) => (elementType, containsNull)
+ case _ =>
+ val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
--- End diff --
When does this happen?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208277492
--- Diff: sql/core/src/test/resources/sql-tests/inputs/higher-order-functions.sql ---
@@ -0,0 +1,26 @@
+create or replace temporary view nested as values
+ (1, array(32, 97), array(array(12, 99), array(123, 42), array(1))),
+ (2, array(77, -76), array(array(6, 96, 65), array(-1, -2))),
+ (3, array(12), array(array(17)))
+ as t(x, ys, zs);
+
+-- Only allow lambda's in higher order functions.
+select upper(x -> x) as v;
+
+-- Identity transform an array
+select transform(zs, z -> z) as v from nested;
+
+-- Transform an array
+select transform(ys, y -> y * y) as v from nested;
+
+-- Transform an array with index
+select transform(ys, (y, i) -> y + i) as v from nested;
+
+-- Transform an array with reference
+select transform(zs, z -> concat(ys, z)) as v from nested;
+
+-- Transform an array to an array of 0's
+select transform(ys, 0) as v from nested;
+
+-- Transform a null array
+select transform(cast(null as array<int>), x -> x + 1) as v;
--- End diff --
shall we add a test for nested lambda?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208282200
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression
+ with CodegenFallback {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression with CodegenFallback {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
+
+ /**
+ * All inputs must be resolved and all functions must be resolved lambda functions.
+ */
+ override lazy val resolved: Boolean = inputResolved && functions.forall {
+ case l: LambdaFunction => l.resolved
+ case _ => false
+ }
+
+ /**
+ * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
+ * bind function takes the potential lambda and it's (partial) arguments and converts this into
+ * a bound lambda function.
+ */
+ def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
+
+ @transient lazy val functionsForEval: Seq[Expression] = functions.map {
+ case LambdaFunction(function, arguments, hidden) =>
+ val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
+ function.transformUp {
+ case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
+ argumentMap(variable.exprId)
+ }
+ }
+}
+
+trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
+
+ def input: Expression
+
+ override def inputs: Seq[Expression] = input :: Nil
+
+ def function: Expression
+
+ override def functions: Seq[Expression] = function :: Nil
+
+ def expectingFunctionType: AbstractDataType = AnyDataType
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
+
+ @transient lazy val functionForEval: Expression = functionsForEval.head
+}
+
+/**
+ * Transform elements in an array using the transform function. This is similar to
+ * a `map` in functional programming.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
+ array(2, 3, 4)
+ > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
+ array(1, 3, 5)
+ """,
+ since = "2.4.0")
+case class ArrayTransform(
+ input: Expression,
+ function: Expression)
+ extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+ override def nullable: Boolean = input.nullable
+
+ override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
+
+ override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
+ val (elementType, containsNull) = input.dataType match {
+ case ArrayType(elementType, containsNull) => (elementType, containsNull)
+ case _ =>
+ val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
+ (elementType, containsNull)
+ }
+ function match {
+ case LambdaFunction(_, arguments, _) if arguments.size == 2 =>
+ copy(function = f(function, (elementType, containsNull) :: (IntegerType, false) :: Nil))
+ case _ =>
+ copy(function = f(function, (elementType, containsNull) :: Nil))
+ }
+ }
+
+ @transient lazy val (elementVar, indexVar) = {
+ val LambdaFunction(_, (elementVar: NamedLambdaVariable) +: tail, _) = function
+ val indexVar = if (tail.nonEmpty) {
+ Some(tail.head.asInstanceOf[NamedLambdaVariable])
+ } else {
+ None
+ }
+ (elementVar, indexVar)
+ }
+
+ override def eval(input: InternalRow): Any = {
+ val arr = this.input.eval(input).asInstanceOf[ArrayData]
--- End diff --
I'll see the other prs and submit a follow-up as well.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94019/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93973/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #93950 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93950/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
* This patch **fails due to an unknown error code, -9**.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:
* `case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] `
* ` s\"its class is $`
* `case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] `
* `case class NamedLambdaVariable(`
* `case class LambdaFunction(`
* `trait HigherOrderFunction extends Expression `
* `trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes `
* `case class ArrayTransform(`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #93934 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93934/testReport)** for PR 21954 at commit [`ee450c5`](https://github.com/apache/spark/commit/ee450c5ef3f99d3bbf8dbbd05273bc63005bbccb).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207172497
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
--- End diff --
I see. Thanks.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #94002 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94002/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208281900
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression
+ with CodegenFallback {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression with CodegenFallback {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
+
+ /**
+ * All inputs must be resolved and all functions must be resolved lambda functions.
+ */
+ override lazy val resolved: Boolean = inputResolved && functions.forall {
+ case l: LambdaFunction => l.resolved
+ case _ => false
+ }
+
+ /**
+ * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
+ * bind function takes the potential lambda and it's (partial) arguments and converts this into
+ * a bound lambda function.
+ */
+ def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
+
+ @transient lazy val functionsForEval: Seq[Expression] = functions.map {
+ case LambdaFunction(function, arguments, hidden) =>
+ val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
+ function.transformUp {
+ case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
+ argumentMap(variable.exprId)
+ }
+ }
+}
+
+trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
+
+ def input: Expression
+
+ override def inputs: Seq[Expression] = input :: Nil
+
+ def function: Expression
+
+ override def functions: Seq[Expression] = function :: Nil
+
+ def expectingFunctionType: AbstractDataType = AnyDataType
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
+
+ @transient lazy val functionForEval: Expression = functionsForEval.head
--- End diff --
Ah, makes sense. Currently we have some prs for other higher-order functions, so I'll see them and submit a follow-up if needed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94002/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207169967
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
--- End diff --
Hmm, seems like overriding `fastEquals` is not enough..
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208282971
--- Diff: sql/core/src/test/resources/sql-tests/inputs/higher-order-functions.sql ---
@@ -0,0 +1,26 @@
+create or replace temporary view nested as values
+ (1, array(32, 97), array(array(12, 99), array(123, 42), array(1))),
+ (2, array(77, -76), array(array(6, 96, 65), array(-1, -2))),
+ (3, array(12), array(array(17)))
+ as t(x, ys, zs);
+
+-- Only allow lambda's in higher order functions.
+select upper(x -> x) as v;
+
+-- Identity transform an array
+select transform(zs, z -> z) as v from nested;
+
+-- Transform an array
+select transform(ys, y -> y * y) as v from nested;
+
+-- Transform an array with index
+select transform(ys, (y, i) -> y + i) as v from nested;
+
+-- Transform an array with reference
+select transform(zs, z -> concat(ys, z)) as v from nested;
+
+-- Transform an array to an array of 0's
+select transform(ys, 0) as v from nested;
+
+-- Transform a null array
+select transform(cast(null as array<int>), x -> x + 1) as v;
--- End diff --
Actually we have some at #21965 and #21982.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207162167
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
--- End diff --
When I tried to make copies of `NamedLambdaVariable`s, the `transformUp` doesn't replace the variables, and generated wrong results.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #94019 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94019/testReport)** for PR 21954 at commit [`c3bf6a0`](https://github.com/apache/spark/commit/c3bf6a0059a151ba23cf32c842e31ced3b28726c).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:
* `case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] `
* ` s\"its class is $`
* `case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] `
* `case class NamedLambdaVariable(`
* `case class LambdaFunction(`
* `trait HigherOrderFunction extends Expression `
* `trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes `
* `case class ArrayTransform(`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208269300
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression
+ with CodegenFallback {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression with CodegenFallback {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
+
+ /**
+ * All inputs must be resolved and all functions must be resolved lambda functions.
+ */
+ override lazy val resolved: Boolean = inputResolved && functions.forall {
+ case l: LambdaFunction => l.resolved
+ case _ => false
+ }
+
+ /**
+ * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
+ * bind function takes the potential lambda and it's (partial) arguments and converts this into
+ * a bound lambda function.
+ */
+ def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
+
+ @transient lazy val functionsForEval: Seq[Expression] = functions.map {
+ case LambdaFunction(function, arguments, hidden) =>
+ val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
+ function.transformUp {
+ case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
+ argumentMap(variable.exprId)
+ }
+ }
+}
+
+trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
+
+ def input: Expression
+
+ override def inputs: Seq[Expression] = input :: Nil
+
+ def function: Expression
+
+ override def functions: Seq[Expression] = function :: Nil
+
+ def expectingFunctionType: AbstractDataType = AnyDataType
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
+
+ @transient lazy val functionForEval: Expression = functionsForEval.head
+}
+
+/**
+ * Transform elements in an array using the transform function. This is similar to
+ * a `map` in functional programming.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
+ array(2, 3, 4)
+ > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
+ array(1, 3, 5)
+ """,
+ since = "2.4.0")
+case class ArrayTransform(
+ input: Expression,
+ function: Expression)
+ extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+ override def nullable: Boolean = input.nullable
+
+ override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
+
+ override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
+ val (elementType, containsNull) = input.dataType match {
+ case ArrayType(elementType, containsNull) => (elementType, containsNull)
+ case _ =>
+ val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
+ (elementType, containsNull)
+ }
+ function match {
+ case LambdaFunction(_, arguments, _) if arguments.size == 2 =>
+ copy(function = f(function, (elementType, containsNull) :: (IntegerType, false) :: Nil))
+ case _ =>
+ copy(function = f(function, (elementType, containsNull) :: Nil))
+ }
+ }
+
+ @transient lazy val (elementVar, indexVar) = {
+ val LambdaFunction(_, (elementVar: NamedLambdaVariable) +: tail, _) = function
+ val indexVar = if (tail.nonEmpty) {
+ Some(tail.head.asInstanceOf[NamedLambdaVariable])
+ } else {
+ None
+ }
+ (elementVar, indexVar)
+ }
+
+ override def eval(input: InternalRow): Any = {
+ val arr = this.input.eval(input).asInstanceOf[ArrayData]
--- End diff --
nit: we should do some renaming to avoid the conflict, e.g. rename `ArrayBasedHigherOrderFunction#input` to `inputArray`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208266333
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression
+ with CodegenFallback {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression with CodegenFallback {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
+
+ /**
+ * All inputs must be resolved and all functions must be resolved lambda functions.
+ */
+ override lazy val resolved: Boolean = inputResolved && functions.forall {
+ case l: LambdaFunction => l.resolved
+ case _ => false
+ }
+
+ /**
+ * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
+ * bind function takes the potential lambda and it's (partial) arguments and converts this into
+ * a bound lambda function.
+ */
+ def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
+
+ @transient lazy val functionsForEval: Seq[Expression] = functions.map {
+ case LambdaFunction(function, arguments, hidden) =>
+ val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
+ function.transformUp {
+ case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
+ argumentMap(variable.exprId)
+ }
+ }
+}
+
+trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
+
+ def input: Expression
+
+ override def inputs: Seq[Expression] = input :: Nil
+
+ def function: Expression
+
+ override def functions: Seq[Expression] = function :: Nil
+
+ def expectingFunctionType: AbstractDataType = AnyDataType
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
+
+ @transient lazy val functionForEval: Expression = functionsForEval.head
--- End diff --
does this need to be a `lazy val`? `Seq#head` is very cheap.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:
https://github.com/apache/spark/pull/21954
cc @hvanhovell
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by holdensmagicalunicorn <gi...@git.apache.org>.
Github user holdensmagicalunicorn commented on the issue:
https://github.com/apache/spark/pull/21954
@ueshin, thanks! I am a bot who has found some folks who might be able to help with the review:@rxin, @cloud-fan and @hvanhovell
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1615/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1659/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r207143138
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ value: AtomicReference[Any] = new AtomicReference(),
+ exprId: ExprId = NamedExpression.newExprId)
+ extends LeafExpression
+ with NamedExpression
+ with CodegenFallback {
+
+ override def qualifier: Option[String] = None
+
+ override def newInstance(): NamedExpression =
+ copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+ override def toAttribute: Attribute = {
+ AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None)
+ }
+
+ override def eval(input: InternalRow): Any = value.get
+
+ override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+ override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden when a user wants to
+ * process an completely independent expression in a [[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within the higher order function.
+ */
+case class LambdaFunction(
+ function: Expression,
+ arguments: Seq[NamedExpression],
+ hidden: Boolean = false)
+ extends Expression with CodegenFallback {
+
+ override def children: Seq[Expression] = function +: arguments
+ override def dataType: DataType = function.dataType
+ override def nullable: Boolean = function.nullable
+
+ lazy val bound: Boolean = arguments.forall(_.resolved)
+
+ override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and applies these to some objects.
+ * The function produces a number of variables which can be consumed by some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+ override def children: Seq[Expression] = inputs ++ functions
+
+ /**
+ * Inputs to the higher ordered function.
+ */
+ def inputs: Seq[Expression]
+
+ /**
+ * All inputs have been resolved. This means that the types and nullabilty of (most of) the
+ * lambda function arguments is known, and that we can start binding the lambda functions.
+ */
+ lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+ /**
+ * Functions applied by the higher order function.
+ */
+ def functions: Seq[Expression]
+
+ /**
+ * All inputs must be resolved and all functions must be resolved lambda functions.
+ */
+ override lazy val resolved: Boolean = inputResolved && functions.forall {
+ case l: LambdaFunction => l.resolved
+ case _ => false
+ }
+
+ /**
+ * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The
+ * bind function takes the potential lambda and it's (partial) arguments and converts this into
+ * a bound lambda function.
+ */
+ def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction
+
+ @transient lazy val functionsForEval: Seq[Expression] = functions.map {
+ case LambdaFunction(function, arguments, hidden) =>
+ val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
+ function.transformUp {
+ case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) =>
+ argumentMap(variable.exprId)
+ }
+ }
+}
+
+trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes {
+
+ def input: Expression
+
+ override def inputs: Seq[Expression] = input :: Nil
+
+ def function: Expression
+
+ override def functions: Seq[Expression] = function :: Nil
+
+ def expectingFunctionType: AbstractDataType = AnyDataType
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)
+
+ @transient lazy val functionForEval: Expression = functionsForEval.head
+}
+
+/**
+ * Transform elements in an array using the transform function. This is similar to
+ * a `map` in functional programming.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
+ array(2, 3, 4)
+ > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i);
+ array(1, 3, 5)
+ """,
+ since = "2.4.0")
+case class ArrayTransform(
+ input: Expression,
+ function: Expression)
+ extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+ override def nullable: Boolean = input.nullable
+
+ override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
+
+ override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
+ val (elementType, containsNull) = input.dataType match {
+ case ArrayType(elementType, containsNull) => (elementType, containsNull)
+ case _ =>
+ val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
--- End diff --
It happens when the first argument is not an array (e.g., https://github.com/apache/spark/pull/21954/files#diff-8e1a34391fdefa4a3a0349d7d454d86fR1798).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21954#discussion_r208272808
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different from regular function
+ * resolution because lambda functions can only be resolved after the function has been resolved;
+ * so we need to resolve higher order function when all children are either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.transformExpressions {
+ case u @ UnresolvedFunction(fn, children, false)
+ if hasLambdaAndResolvedArguments(children) =>
+ withPosition(u) {
+ catalog.lookupFunction(fn, children) match {
+ case func: HigherOrderFunction => func
+ case other => other.failAnalysis(
+ "A lambda function should only be used in a higher order function. However, " +
+ s"its class is ${other.getClass.getCanonicalName}, which is not a " +
+ s"higher order function.")
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the arguments of a function are either resolved or a lambda function.
+ */
+ private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = {
+ val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction])
+ lambdas.nonEmpty && others.forall(_.resolved)
+ }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's
+ * arguments; this creates named and typed lambda variables. The argument names are checked
+ * for duplicates and the number of arguments are checked during this step.
+ * [2]. Resolve the used lambda variables used in the lambda function's function expression tree.
+ * Note that we allow the use of variables from outside the current lambda, this can either
+ * be a lambda function defined in an outer scope, or a attribute in produced by the plan's
+ * child. If names are duplicate, the name defined in the most inner scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
+
+ type LambdaVariableMap = Map[String, NamedExpression]
+
+ private val canonicalizer = {
+ if (!conf.caseSensitiveAnalysis) {
+ s: String => s.toLowerCase
+ } else {
+ s: String => s
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperators {
+ case q: LogicalPlan =>
+ q.mapExpressions(resolve(_, Map.empty))
+ }
+ }
+
+ /**
+ * Create a bound lambda function by binding the arguments of a lambda function to the given
+ * partial arguments (dataType and nullability only). If the expression happens to be an already
+ * bound lambda function then we assume it has been bound to the correct arguments and do
+ * nothing. This function will produce a lambda function with hidden arguments when it is passed
+ * an arbitrary expression.
+ */
+ private def createLambda(
+ e: Expression,
+ partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
--- End diff --
why call it "partial"?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21954
**[Test build #93934 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93934/testReport)** for PR 21954 at commit [`ee450c5`](https://github.com/apache/spark/commit/ee450c5ef3f99d3bbf8dbbd05273bc63005bbccb).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21954
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org